from __future__ import annotations import argparse import json import re import sys import textwrap import time from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen REPO_ROOT = Path(__file__).resolve().parent.parent DEFAULT_ARTIFACTS_ROOT = REPO_ROOT / "artifacts" / "domain_runs" DEFAULT_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "assistant_sessions" DEFAULT_REPORTS_DIR = REPO_ROOT / "llm_normalizer" / "reports" DEFAULT_BACKEND_URL = "http://127.0.0.1:8787" TECH_SECTION_HEADER = "### technical_debug_payload_json" def dump_json(payload: Any) -> str: return json.dumps(payload, ensure_ascii=False, indent=2) def write_text(file_path: Path, text: str) -> None: file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(text, encoding="utf-8", newline="\n") def write_json(file_path: Path, payload: Any) -> None: write_text(file_path, dump_json(payload) + "\n") def sanitize_export_text(value: str) -> str: raw = str(value or "") debug_heading = re.search( r"(?:^|\n)\s*#{0,6}\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json|debug_payload|technical_breakdown)\b", raw, flags=re.IGNORECASE, ) pre_cut = raw[: debug_heading.start()] if debug_heading else raw without_debug = re.sub( r"###\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json)[\s\S]*?(?:```[\s\S]*?```|$)", "", pre_cut, flags=re.IGNORECASE, ) without_debug = re.sub( r"(?:^|\n)\s*#{0,6}\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json)\b[\s\S]*$", "", without_debug, flags=re.IGNORECASE, ) inline_patterns = [ re.compile(r"\b(?:debug_payload_json|technical_breakdown_json)\b", re.IGNORECASE), re.compile(r"\b(?:route_summary|semantic_profile|domain_scope|relation_patterns|account_scope)\b", re.IGNORECASE), re.compile(r"\b(?:coverage_report|retrieval_status|problem_unit_state|candidate_evidence)\b", re.IGNORECASE), re.compile(r"\b(?:graph_domain_scope|graph_runtime|selection_reason|why_included)\b", re.IGNORECASE), ] output_lines: list[str] = [] for line in without_debug.splitlines(): cleaned = line.rstrip() if not cleaned.strip(): continue if any(pattern.search(cleaned) for pattern in inline_patterns): continue output_lines.append(cleaned) return "\n".join(output_lines).strip() def build_conversation_export(session_id: str, conversation: list[dict[str, Any]], mode: str = "technical") -> str: include_debug = mode == "technical" lines = [ "# Assistant conversation export", f"session_id: {session_id or 'n/a'}", f"export_mode: {mode}", f"exported_at: {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}", "", ] for index, item in enumerate(conversation, start=1): safe_text = sanitize_export_text(str(item.get("text") or "")) lines.append(f"## {index}. {item.get('role', 'unknown')}") lines.append(f"message_id: {item.get('message_id') or 'n/a'}") lines.append(f"created_at: {item.get('created_at') or 'n/a'}") lines.append(f"reply_type: {item.get('reply_type') or 'n/a'}") trace_id = item.get("trace_id") if trace_id: lines.append(f"trace_id: {trace_id}") lines.extend(["", safe_text or "(empty)", ""]) if include_debug and item.get("role") == "assistant" and item.get("debug") is not None: lines.extend([TECH_SECTION_HEADER, "```json", dump_json(item["debug"]), "```", ""]) return "\n".join(lines) def slugify_case_id(domain: str, explicit_case_id: str | None) -> str: if explicit_case_id: normalized = explicit_case_id.strip() if normalized: return normalized timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") cleaned_domain = re.sub(r"[^0-9A-Za-zА-Яа-я_-]+", "_", domain.strip(), flags=re.UNICODE).strip("_") return f"{cleaned_domain or 'domain_case'}_{timestamp}" def http_json(url: str, *, method: str = "GET", payload: dict[str, Any] | None = None, timeout: int = 30) -> dict[str, Any]: data = None headers = {"Accept": "application/json"} if payload is not None: data = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json; charset=utf-8" request = Request(url, data=data, method=method, headers=headers) try: with urlopen(request, timeout=timeout) as response: body = response.read().decode("utf-8") except HTTPError as error: detail = error.read().decode("utf-8", errors="replace") raise RuntimeError(f"HTTP {error.code} for {url}: {detail}") from error except URLError as error: raise RuntimeError(f"Failed to reach backend at {url}: {error}") from error try: return json.loads(body) except json.JSONDecodeError as error: raise RuntimeError(f"Backend returned non-JSON payload for {url}") from error def wait_for_job(backend_url: str, job_id: str, timeout_seconds: int, poll_interval_seconds: float) -> dict[str, Any]: deadline = time.time() + timeout_seconds last_status = None while time.time() < deadline: response = http_json(f"{backend_url}/api/eval/run-async/{job_id}") job = response.get("job") if not isinstance(job, dict): raise RuntimeError("Async job response does not contain `job` object") status = str(job.get("status") or "unknown") if status != last_status: print(f"[domain-case-loop] job {job_id}: {status}") last_status = status if status in {"completed", "failed"}: return job time.sleep(poll_interval_seconds) raise TimeoutError(f"Async job {job_id} did not finish within {timeout_seconds} seconds") def wait_for_file(file_path: Path, timeout_seconds: int = 30) -> None: deadline = time.time() + timeout_seconds while time.time() < deadline: if file_path.exists(): return time.sleep(0.5) raise FileNotFoundError(f"Timed out waiting for file: {file_path}") def read_json_file(file_path: Path) -> dict[str, Any]: return json.loads(file_path.read_text(encoding="utf-8-sig")) def extract_conversation_from_session(session_record: dict[str, Any]) -> list[dict[str, Any]]: conversation = session_record.get("conversation") if isinstance(conversation, list) and conversation: output: list[dict[str, Any]] = [] for item in conversation: if not isinstance(item, dict): continue output.append( { "message_id": item.get("message_id"), "role": item.get("role"), "text": item.get("text") or "", "reply_type": item.get("reply_type"), "created_at": item.get("created_at"), "trace_id": item.get("trace_id"), "debug": item.get("debug"), } ) if output: return output turns = session_record.get("turns") if not isinstance(turns, list): return [] output: list[dict[str, Any]] = [] for turn in turns: if not isinstance(turn, dict): continue technical_json = turn.get("technical_json") if not isinstance(technical_json, dict): continue for item in (technical_json.get("user_message"), technical_json.get("assistant_message")): if not isinstance(item, dict): continue output.append( { "message_id": item.get("message_id"), "role": item.get("role"), "text": item.get("text") or "", "reply_type": item.get("reply_type"), "created_at": item.get("created_at"), "trace_id": item.get("trace_id"), "debug": item.get("debug"), } ) return output def find_last_assistant(conversation: list[dict[str, Any]]) -> dict[str, Any]: for item in reversed(conversation): if item.get("role") == "assistant": return item raise RuntimeError("Conversation does not contain assistant message") def find_last_user_before(conversation: list[dict[str, Any]], assistant_message_id: Any) -> dict[str, Any] | None: before_assistant: list[dict[str, Any]] = [] for item in conversation: if item.get("message_id") == assistant_message_id: break before_assistant.append(item) for item in reversed(before_assistant): if item.get("role") == "user": return item return None def extract_last_turn(session_record: dict[str, Any]) -> dict[str, Any] | None: turns = session_record.get("turns") if not isinstance(turns, list) or not turns: return None last_turn = turns[-1] return last_turn if isinstance(last_turn, dict) else None def extract_report_case(report_record: dict[str, Any], case_id: str) -> dict[str, Any] | None: results = report_record.get("results") if not isinstance(results, list): return None for item in results: if isinstance(item, dict) and str(item.get("case_id") or "") == case_id: return item return None def build_turn_artifact( *, slot: str, domain: str, case_id: str, question: str | None, session_id: str, conversation: list[dict[str, Any]], session_record: dict[str, Any] | None, job_record: dict[str, Any] | None, report_case: dict[str, Any] | None, export_file_name: str, ) -> dict[str, Any]: last_assistant = find_last_assistant(conversation) last_user = find_last_user_before(conversation, last_assistant.get("message_id")) final_question = question or (last_user.get("text") if isinstance(last_user, dict) else None) last_turn = extract_last_turn(session_record or {}) return { "schema_version": "domain_case_turn_artifact_v1", "artifact_slot": slot, "domain": domain, "case_id": case_id, "question": final_question, "session_id": session_id, "run": { "job_id": job_record.get("job_id") if isinstance(job_record, dict) else None, "run_id": job_record.get("run_id") if isinstance(job_record, dict) else None, "analysis_date": job_record.get("analysis_date") if isinstance(job_record, dict) else None, "report_case_available": report_case is not None, }, "user_message": last_user, "assistant_message": last_assistant, "technical_debug_payload": last_assistant.get("debug"), "session_summary": { "schema_version": session_record.get("schema_version") if isinstance(session_record, dict) else None, "updated_at": session_record.get("updated_at") if isinstance(session_record, dict) else None, "trace_ids": session_record.get("trace_ids") if isinstance(session_record, dict) else None, "reply_types": session_record.get("reply_types") if isinstance(session_record, dict) else None, "investigation_state": session_record.get("investigation_state") if isinstance(session_record, dict) else None, "address_navigation_state": session_record.get("address_navigation_state") if isinstance(session_record, dict) else None, "last_turn": last_turn, }, "report_case": report_case, "export_markdown_file": export_file_name, } def ensure_case_brief( case_dir: Path, *, domain: str, question: str | None, expected_capability: str | None, expected_result_mode: str | None, ) -> None: file_path = case_dir / "case_brief.md" if file_path.exists(): return content = textwrap.dedent( f"""\ # Case brief ## Domain `{domain}` ## Raw user question `{question or ""}` ## Expected business meaning - ## Expected capability - {expected_capability or ""} ## Expected result mode - {expected_result_mode or ""} ## Constraints - no architecture changes - 1C/MCP first - no fabricated values - heuristic is not product success - accepted requires analyst quality score >= 80 and zero unresolved P0 ## Known current behavior - ## Draft acceptance criteria - """ ) write_text(file_path, content) def save_capture_bundle( *, case_dir: Path, slot: str, export_markdown: str, debug_payload: Any, turn_artifact: dict[str, Any], session_record: dict[str, Any] | None, job_record: dict[str, Any] | None, report_case: dict[str, Any] | None, ) -> None: write_text(case_dir / f"{slot}_output.md", export_markdown) write_json(case_dir / f"{slot}_debug.json", debug_payload if debug_payload is not None else {}) write_json(case_dir / f"{slot}_turn.json", turn_artifact) if session_record is not None: write_json(case_dir / f"{slot}_session.json", session_record) if job_record is not None: write_json(case_dir / f"{slot}_job.json", job_record) if report_case is not None: write_json(case_dir / f"{slot}_report_case.json", report_case) def parse_metadata_line(line: str) -> tuple[str, str] | None: if ":" not in line: return None key, value = line.split(":", 1) return key.strip(), value.strip() def parse_export_markdown(text: str) -> tuple[str, list[dict[str, Any]]]: session_id = "n/a" session_match = re.search(r"^session_id:\s*(.+?)\s*$", text, flags=re.MULTILINE) if session_match: session_id = session_match.group(1).strip() section_pattern = re.compile(r"^##\s+\d+\.\s+(user|assistant)\s*$", flags=re.MULTILINE) sections = list(section_pattern.finditer(text)) conversation: list[dict[str, Any]] = [] for index, match in enumerate(sections): role = match.group(1) start = match.end() end = sections[index + 1].start() if index + 1 < len(sections) else len(text) block = text[start:end].lstrip("\r\n") lines = block.splitlines() metadata: dict[str, Any] = {"role": role} cursor = 0 while cursor < len(lines): line = lines[cursor] if not line.strip(): cursor += 1 break meta = parse_metadata_line(line) if not meta: break key, value = meta metadata[key] = value cursor += 1 body_lines = lines[cursor:] debug_payload = None debug_start = None for body_index, line in enumerate(body_lines): if line.strip().lower() == TECH_SECTION_HEADER.lower(): debug_start = body_index break if debug_start is not None: body_text_lines = body_lines[:debug_start] debug_lines = body_lines[debug_start + 1 :] debug_text = "\n".join(debug_lines).strip() fenced = re.search(r"```json\s*(.*?)\s*```", debug_text, flags=re.DOTALL | re.IGNORECASE) if fenced: debug_text = fenced.group(1).strip() if debug_text: debug_payload = json.loads(debug_text) else: body_text_lines = body_lines conversation.append( { "message_id": metadata.get("message_id"), "role": role, "text": "\n".join(body_text_lines).strip(), "reply_type": metadata.get("reply_type"), "created_at": metadata.get("created_at"), "trace_id": metadata.get("trace_id"), "debug": debug_payload, } ) if not conversation: raise RuntimeError("Could not parse conversation sections from export markdown") return session_id, conversation def handle_run_case(args: argparse.Namespace) -> int: case_id = slugify_case_id(args.domain, args.case_id) case_dir = Path(args.output_root).resolve() / case_id case_dir.mkdir(parents=True, exist_ok=True) ensure_case_brief( case_dir, domain=args.domain, question=args.question, expected_capability=args.expected_capability, expected_result_mode=args.expected_result_mode, ) payload: dict[str, Any] = { "eval_target": "assistant_stage1", "questions": [args.question], "useMock": bool(args.use_mock), "mode": "standard", } if args.analysis_date: payload["analysis_date"] = args.analysis_date start_response = http_json(f"{args.backend_url}/api/eval/run-async/start", method="POST", payload=payload) job = start_response.get("job") if not isinstance(job, dict): raise RuntimeError("Async start response does not contain `job` object") job_id = str(job.get("job_id") or "") if not job_id: raise RuntimeError("Async start response does not contain job_id") final_job = wait_for_job(args.backend_url, job_id, args.timeout_seconds, args.poll_interval_seconds) if str(final_job.get("status") or "") != "completed": raise RuntimeError(f"Async job did not complete successfully: {final_job.get('status')}") run_id = str(final_job.get("run_id") or "") report_case_id = "AUTO-001" session_id = f"{run_id}-{report_case_id}" session_file = Path(args.sessions_dir).resolve() / f"{session_id}.json" wait_for_file(session_file) session_record = read_json_file(session_file) conversation = extract_conversation_from_session(session_record) export_markdown = build_conversation_export(session_id, conversation, mode="technical") report_case = None report_file = Path(args.reports_dir).resolve() / f"{run_id}.json" if report_file.exists(): report_record = read_json_file(report_file) report_case = extract_report_case(report_record, report_case_id) turn_artifact = build_turn_artifact( slot=args.slot, domain=args.domain, case_id=case_id, question=args.question, session_id=session_id, conversation=conversation, session_record=session_record, job_record=final_job, report_case=report_case, export_file_name=f"{args.slot}_output.md", ) save_capture_bundle( case_dir=case_dir, slot=args.slot, export_markdown=export_markdown, debug_payload=turn_artifact.get("technical_debug_payload"), turn_artifact=turn_artifact, session_record=session_record, job_record=final_job, report_case=report_case, ) print(f"[domain-case-loop] saved {args.slot} artifacts to {case_dir}") print(f"[domain-case-loop] session_id={session_id}") return 0 def handle_import_export(args: argparse.Namespace) -> int: export_text = Path(args.input).read_text(encoding="utf-8-sig") session_id, conversation = parse_export_markdown(export_text) case_id = slugify_case_id(args.domain, args.case_id) case_dir = Path(args.output_root).resolve() / case_id case_dir.mkdir(parents=True, exist_ok=True) last_assistant = find_last_assistant(conversation) last_user = find_last_user_before(conversation, last_assistant.get("message_id")) question = args.question or (last_user.get("text") if isinstance(last_user, dict) else None) ensure_case_brief( case_dir, domain=args.domain, question=question, expected_capability=args.expected_capability, expected_result_mode=args.expected_result_mode, ) turn_artifact = build_turn_artifact( slot=args.slot, domain=args.domain, case_id=case_id, question=question, session_id=session_id, conversation=conversation, session_record=None, job_record=None, report_case=None, export_file_name=f"{args.slot}_output.md", ) save_capture_bundle( case_dir=case_dir, slot=args.slot, export_markdown=export_text, debug_payload=last_assistant.get("debug"), turn_artifact=turn_artifact, session_record=None, job_record=None, report_case=None, ) print(f"[domain-case-loop] imported {args.slot} artifacts to {case_dir}") return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Repo-native helper for NDC_1C domain-case orchestration") subparsers = parser.add_subparsers(dest="command", required=True) run_case = subparsers.add_parser("run-case", help="Run one assistant_stage1 case through the existing backend and save artifacts") run_case.add_argument("--domain", required=True) run_case.add_argument("--question", required=True) run_case.add_argument("--case-id") run_case.add_argument("--slot", default="baseline", choices=["baseline", "rerun"]) run_case.add_argument("--analysis-date") run_case.add_argument("--backend-url", default=DEFAULT_BACKEND_URL) run_case.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT)) run_case.add_argument("--sessions-dir", default=str(DEFAULT_SESSIONS_DIR)) run_case.add_argument("--reports-dir", default=str(DEFAULT_REPORTS_DIR)) run_case.add_argument("--timeout-seconds", type=int, default=300) run_case.add_argument("--poll-interval-seconds", type=float, default=1.5) run_case.add_argument("--expected-capability") run_case.add_argument("--expected-result-mode") run_case.add_argument("--use-mock", action="store_true") run_case.set_defaults(func=handle_run_case) import_export = subparsers.add_parser("import-export", help="Import an existing technical export markdown and build artifacts") import_export.add_argument("--domain", required=True) import_export.add_argument("--input", required=True) import_export.add_argument("--question") import_export.add_argument("--case-id") import_export.add_argument("--slot", default="baseline", choices=["baseline", "rerun"]) import_export.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT)) import_export.add_argument("--expected-capability") import_export.add_argument("--expected-result-mode") import_export.set_defaults(func=handle_import_export) return parser def main() -> int: parser = build_parser() args = parser.parse_args() try: return int(args.func(args)) except Exception as error: # noqa: BLE001 print(f"[domain-case-loop] error: {error}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())