NODEDC_1C/scripts/domain_case_loop.py

623 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import argparse
import json
import re
import sys
import textwrap
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
REPO_ROOT = Path(__file__).resolve().parent.parent
DEFAULT_ARTIFACTS_ROOT = REPO_ROOT / "artifacts" / "domain_runs"
DEFAULT_SESSIONS_DIR = REPO_ROOT / "llm_normalizer" / "data" / "assistant_sessions"
DEFAULT_REPORTS_DIR = REPO_ROOT / "llm_normalizer" / "reports"
DEFAULT_BACKEND_URL = "http://127.0.0.1:8787"
DEFAULT_PROMPT_VERSION = "address_query_runtime_v1"
DEFAULT_LLM_PROVIDER = "local"
DEFAULT_LLM_MODEL = "qwen2.5-14b-instruct-1m"
DEFAULT_LLM_BASE_URL = "http://127.0.0.1:1234/v1"
DEFAULT_LLM_API_KEY = ""
DEFAULT_TEMPERATURE = 0.0
DEFAULT_MAX_OUTPUT_TOKENS = 900
TECH_SECTION_HEADER = "### technical_debug_payload_json"
def dump_json(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=False, indent=2)
def write_text(file_path: Path, text: str) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(text, encoding="utf-8", newline="\n")
def write_json(file_path: Path, payload: Any) -> None:
write_text(file_path, dump_json(payload) + "\n")
def sanitize_export_text(value: str) -> str:
raw = str(value or "")
debug_heading = re.search(
r"(?:^|\n)\s*#{0,6}\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json|debug_payload|technical_breakdown)\b",
raw,
flags=re.IGNORECASE,
)
pre_cut = raw[: debug_heading.start()] if debug_heading else raw
without_debug = re.sub(
r"###\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json)[\s\S]*?(?:```[\s\S]*?```|$)",
"",
pre_cut,
flags=re.IGNORECASE,
)
without_debug = re.sub(
r"(?:^|\n)\s*#{0,6}\s*(?:debug_payload_json|technical_breakdown_json|route_summary_json)\b[\s\S]*$",
"",
without_debug,
flags=re.IGNORECASE,
)
inline_patterns = [
re.compile(r"\b(?:debug_payload_json|technical_breakdown_json)\b", re.IGNORECASE),
re.compile(r"\b(?:route_summary|semantic_profile|domain_scope|relation_patterns|account_scope)\b", re.IGNORECASE),
re.compile(r"\b(?:coverage_report|retrieval_status|problem_unit_state|candidate_evidence)\b", re.IGNORECASE),
re.compile(r"\b(?:graph_domain_scope|graph_runtime|selection_reason|why_included)\b", re.IGNORECASE),
]
output_lines: list[str] = []
for line in without_debug.splitlines():
cleaned = line.rstrip()
if not cleaned.strip():
continue
if any(pattern.search(cleaned) for pattern in inline_patterns):
continue
output_lines.append(cleaned)
return "\n".join(output_lines).strip()
def build_conversation_export(session_id: str, conversation: list[dict[str, Any]], mode: str = "technical") -> str:
include_debug = mode == "technical"
lines = [
"# Assistant conversation export",
f"session_id: {session_id or 'n/a'}",
f"export_mode: {mode}",
f"exported_at: {datetime.now(timezone.utc).replace(microsecond=0).isoformat()}",
"",
]
for index, item in enumerate(conversation, start=1):
safe_text = sanitize_export_text(str(item.get("text") or ""))
lines.append(f"## {index}. {item.get('role', 'unknown')}")
lines.append(f"message_id: {item.get('message_id') or 'n/a'}")
lines.append(f"created_at: {item.get('created_at') or 'n/a'}")
lines.append(f"reply_type: {item.get('reply_type') or 'n/a'}")
trace_id = item.get("trace_id")
if trace_id:
lines.append(f"trace_id: {trace_id}")
lines.extend(["", safe_text or "(empty)", ""])
if include_debug and item.get("role") == "assistant" and item.get("debug") is not None:
lines.extend([TECH_SECTION_HEADER, "```json", dump_json(item["debug"]), "```", ""])
return "\n".join(lines)
def slugify_case_id(domain: str, explicit_case_id: str | None) -> str:
if explicit_case_id:
normalized = explicit_case_id.strip()
if normalized:
return normalized
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
cleaned_domain = re.sub(r"[^0-9A-Za-zА-Яа-я_-]+", "_", domain.strip(), flags=re.UNICODE).strip("_")
return f"{cleaned_domain or 'domain_case'}_{timestamp}"
def http_json(url: str, *, method: str = "GET", payload: dict[str, Any] | None = None, timeout: int = 30) -> dict[str, Any]:
data = None
headers = {"Accept": "application/json"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json; charset=utf-8"
request = Request(url, data=data, method=method, headers=headers)
try:
with urlopen(request, timeout=timeout) as response:
body = response.read().decode("utf-8")
except HTTPError as error:
detail = error.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {error.code} for {url}: {detail}") from error
except URLError as error:
raise RuntimeError(f"Failed to reach backend at {url}: {error}") from error
try:
return json.loads(body)
except json.JSONDecodeError as error:
raise RuntimeError(f"Backend returned non-JSON payload for {url}") from error
def wait_for_job(backend_url: str, job_id: str, timeout_seconds: int, poll_interval_seconds: float) -> dict[str, Any]:
deadline = time.time() + timeout_seconds
last_status = None
while time.time() < deadline:
response = http_json(f"{backend_url}/api/eval/run-async/{job_id}")
job = response.get("job")
if not isinstance(job, dict):
raise RuntimeError("Async job response does not contain `job` object")
status = str(job.get("status") or "unknown")
if status != last_status:
print(f"[domain-case-loop] job {job_id}: {status}")
last_status = status
if status in {"completed", "failed"}:
return job
time.sleep(poll_interval_seconds)
raise TimeoutError(f"Async job {job_id} did not finish within {timeout_seconds} seconds")
def wait_for_file(file_path: Path, timeout_seconds: int = 30) -> None:
deadline = time.time() + timeout_seconds
while time.time() < deadline:
if file_path.exists():
return
time.sleep(0.5)
raise FileNotFoundError(f"Timed out waiting for file: {file_path}")
def read_json_file(file_path: Path) -> dict[str, Any]:
return json.loads(file_path.read_text(encoding="utf-8-sig"))
def extract_conversation_from_session(session_record: dict[str, Any]) -> list[dict[str, Any]]:
conversation = session_record.get("conversation")
if isinstance(conversation, list) and conversation:
output: list[dict[str, Any]] = []
for item in conversation:
if not isinstance(item, dict):
continue
output.append(
{
"message_id": item.get("message_id"),
"role": item.get("role"),
"text": item.get("text") or "",
"reply_type": item.get("reply_type"),
"created_at": item.get("created_at"),
"trace_id": item.get("trace_id"),
"debug": item.get("debug"),
}
)
if output:
return output
turns = session_record.get("turns")
if not isinstance(turns, list):
return []
output: list[dict[str, Any]] = []
for turn in turns:
if not isinstance(turn, dict):
continue
technical_json = turn.get("technical_json")
if not isinstance(technical_json, dict):
continue
for item in (technical_json.get("user_message"), technical_json.get("assistant_message")):
if not isinstance(item, dict):
continue
output.append(
{
"message_id": item.get("message_id"),
"role": item.get("role"),
"text": item.get("text") or "",
"reply_type": item.get("reply_type"),
"created_at": item.get("created_at"),
"trace_id": item.get("trace_id"),
"debug": item.get("debug"),
}
)
return output
def find_last_assistant(conversation: list[dict[str, Any]]) -> dict[str, Any]:
for item in reversed(conversation):
if item.get("role") == "assistant":
return item
raise RuntimeError("Conversation does not contain assistant message")
def find_last_user_before(conversation: list[dict[str, Any]], assistant_message_id: Any) -> dict[str, Any] | None:
before_assistant: list[dict[str, Any]] = []
for item in conversation:
if item.get("message_id") == assistant_message_id:
break
before_assistant.append(item)
for item in reversed(before_assistant):
if item.get("role") == "user":
return item
return None
def extract_last_turn(session_record: dict[str, Any]) -> dict[str, Any] | None:
turns = session_record.get("turns")
if not isinstance(turns, list) or not turns:
return None
last_turn = turns[-1]
return last_turn if isinstance(last_turn, dict) else None
def extract_report_case(report_record: dict[str, Any], case_id: str) -> dict[str, Any] | None:
results = report_record.get("results")
if not isinstance(results, list):
return None
for item in results:
if isinstance(item, dict) and str(item.get("case_id") or "") == case_id:
return item
return None
def build_turn_artifact(
*,
slot: str,
domain: str,
case_id: str,
question: str | None,
session_id: str,
conversation: list[dict[str, Any]],
session_record: dict[str, Any] | None,
job_record: dict[str, Any] | None,
report_case: dict[str, Any] | None,
export_file_name: str,
) -> dict[str, Any]:
last_assistant = find_last_assistant(conversation)
last_user = find_last_user_before(conversation, last_assistant.get("message_id"))
final_question = question or (last_user.get("text") if isinstance(last_user, dict) else None)
last_turn = extract_last_turn(session_record or {})
return {
"schema_version": "domain_case_turn_artifact_v1",
"artifact_slot": slot,
"domain": domain,
"case_id": case_id,
"question": final_question,
"session_id": session_id,
"run": {
"job_id": job_record.get("job_id") if isinstance(job_record, dict) else None,
"run_id": job_record.get("run_id") if isinstance(job_record, dict) else None,
"analysis_date": job_record.get("analysis_date") if isinstance(job_record, dict) else None,
"report_case_available": report_case is not None,
},
"user_message": last_user,
"assistant_message": last_assistant,
"technical_debug_payload": last_assistant.get("debug"),
"session_summary": {
"schema_version": session_record.get("schema_version") if isinstance(session_record, dict) else None,
"updated_at": session_record.get("updated_at") if isinstance(session_record, dict) else None,
"trace_ids": session_record.get("trace_ids") if isinstance(session_record, dict) else None,
"reply_types": session_record.get("reply_types") if isinstance(session_record, dict) else None,
"investigation_state": session_record.get("investigation_state") if isinstance(session_record, dict) else None,
"address_navigation_state": session_record.get("address_navigation_state") if isinstance(session_record, dict) else None,
"last_turn": last_turn,
},
"report_case": report_case,
"export_markdown_file": export_file_name,
}
def ensure_case_brief(
case_dir: Path,
*,
domain: str,
question: str | None,
expected_capability: str | None,
expected_result_mode: str | None,
) -> None:
file_path = case_dir / "case_brief.md"
if file_path.exists():
return
content = textwrap.dedent(
f"""\
# Case brief
## Domain
`{domain}`
## Raw user question
`{question or "<fill me>"}`
## Expected business meaning
- <fill me>
## Expected capability
- {expected_capability or "<fill me>"}
## Expected result mode
- {expected_result_mode or "<fill me>"}
## Constraints
- no architecture changes
- 1C/MCP first
- no fabricated values
- heuristic is not product success
- accepted requires analyst quality score >= 80 and zero unresolved P0
## Known current behavior
- <fill me>
## Draft acceptance criteria
- <fill me>
"""
)
write_text(file_path, content)
def save_capture_bundle(
*,
case_dir: Path,
slot: str,
export_markdown: str,
debug_payload: Any,
turn_artifact: dict[str, Any],
session_record: dict[str, Any] | None,
job_record: dict[str, Any] | None,
report_case: dict[str, Any] | None,
) -> None:
write_text(case_dir / f"{slot}_output.md", export_markdown)
write_json(case_dir / f"{slot}_debug.json", debug_payload if debug_payload is not None else {})
write_json(case_dir / f"{slot}_turn.json", turn_artifact)
if session_record is not None:
write_json(case_dir / f"{slot}_session.json", session_record)
if job_record is not None:
write_json(case_dir / f"{slot}_job.json", job_record)
if report_case is not None:
write_json(case_dir / f"{slot}_report_case.json", report_case)
def parse_metadata_line(line: str) -> tuple[str, str] | None:
if ":" not in line:
return None
key, value = line.split(":", 1)
return key.strip(), value.strip()
def parse_export_markdown(text: str) -> tuple[str, list[dict[str, Any]]]:
session_id = "n/a"
session_match = re.search(r"^session_id:\s*(.+?)\s*$", text, flags=re.MULTILINE)
if session_match:
session_id = session_match.group(1).strip()
section_pattern = re.compile(r"^##\s+\d+\.\s+(user|assistant)\s*$", flags=re.MULTILINE)
sections = list(section_pattern.finditer(text))
conversation: list[dict[str, Any]] = []
for index, match in enumerate(sections):
role = match.group(1)
start = match.end()
end = sections[index + 1].start() if index + 1 < len(sections) else len(text)
block = text[start:end].lstrip("\r\n")
lines = block.splitlines()
metadata: dict[str, Any] = {"role": role}
cursor = 0
while cursor < len(lines):
line = lines[cursor]
if not line.strip():
cursor += 1
break
meta = parse_metadata_line(line)
if not meta:
break
key, value = meta
metadata[key] = value
cursor += 1
body_lines = lines[cursor:]
debug_payload = None
debug_start = None
for body_index, line in enumerate(body_lines):
if line.strip().lower() == TECH_SECTION_HEADER.lower():
debug_start = body_index
break
if debug_start is not None:
body_text_lines = body_lines[:debug_start]
debug_lines = body_lines[debug_start + 1 :]
debug_text = "\n".join(debug_lines).strip()
fenced = re.search(r"```json\s*(.*?)\s*```", debug_text, flags=re.DOTALL | re.IGNORECASE)
if fenced:
debug_text = fenced.group(1).strip()
if debug_text:
debug_payload = json.loads(debug_text)
else:
body_text_lines = body_lines
conversation.append(
{
"message_id": metadata.get("message_id"),
"role": role,
"text": "\n".join(body_text_lines).strip(),
"reply_type": metadata.get("reply_type"),
"created_at": metadata.get("created_at"),
"trace_id": metadata.get("trace_id"),
"debug": debug_payload,
}
)
if not conversation:
raise RuntimeError("Could not parse conversation sections from export markdown")
return session_id, conversation
def handle_run_case(args: argparse.Namespace) -> int:
case_id = slugify_case_id(args.domain, args.case_id)
case_dir = Path(args.output_root).resolve() / case_id
case_dir.mkdir(parents=True, exist_ok=True)
ensure_case_brief(
case_dir,
domain=args.domain,
question=args.question,
expected_capability=args.expected_capability,
expected_result_mode=args.expected_result_mode,
)
payload: dict[str, Any] = {
"normalizeConfig": {
"llmProvider": args.llm_provider,
"apiKey": args.llm_api_key,
"model": args.llm_model,
"baseUrl": args.llm_base_url,
"temperature": args.temperature,
"maxOutputTokens": args.max_output_tokens,
"promptVersion": args.prompt_version,
},
"eval_target": "assistant_stage1",
"questions": [args.question],
"useMock": bool(args.use_mock),
"mode": "standard",
}
if args.analysis_date:
payload["analysis_date"] = args.analysis_date
start_response = http_json(f"{args.backend_url}/api/eval/run-async/start", method="POST", payload=payload)
job = start_response.get("job")
if not isinstance(job, dict):
raise RuntimeError("Async start response does not contain `job` object")
job_id = str(job.get("job_id") or "")
if not job_id:
raise RuntimeError("Async start response does not contain job_id")
final_job = wait_for_job(args.backend_url, job_id, args.timeout_seconds, args.poll_interval_seconds)
if str(final_job.get("status") or "") != "completed":
raise RuntimeError(f"Async job did not complete successfully: {final_job.get('status')}")
run_id = str(final_job.get("run_id") or "")
report_case_id = "AUTO-001"
session_id = f"{run_id}-{report_case_id}"
session_file = Path(args.sessions_dir).resolve() / f"{session_id}.json"
wait_for_file(session_file)
session_record = read_json_file(session_file)
conversation = extract_conversation_from_session(session_record)
export_markdown = build_conversation_export(session_id, conversation, mode="technical")
report_case = None
report_file = Path(args.reports_dir).resolve() / f"{run_id}.json"
if report_file.exists():
report_record = read_json_file(report_file)
report_case = extract_report_case(report_record, report_case_id)
turn_artifact = build_turn_artifact(
slot=args.slot,
domain=args.domain,
case_id=case_id,
question=args.question,
session_id=session_id,
conversation=conversation,
session_record=session_record,
job_record=final_job,
report_case=report_case,
export_file_name=f"{args.slot}_output.md",
)
save_capture_bundle(
case_dir=case_dir,
slot=args.slot,
export_markdown=export_markdown,
debug_payload=turn_artifact.get("technical_debug_payload"),
turn_artifact=turn_artifact,
session_record=session_record,
job_record=final_job,
report_case=report_case,
)
print(f"[domain-case-loop] saved {args.slot} artifacts to {case_dir}")
print(f"[domain-case-loop] session_id={session_id}")
return 0
def handle_import_export(args: argparse.Namespace) -> int:
export_text = Path(args.input).read_text(encoding="utf-8-sig")
session_id, conversation = parse_export_markdown(export_text)
case_id = slugify_case_id(args.domain, args.case_id)
case_dir = Path(args.output_root).resolve() / case_id
case_dir.mkdir(parents=True, exist_ok=True)
last_assistant = find_last_assistant(conversation)
last_user = find_last_user_before(conversation, last_assistant.get("message_id"))
question = args.question or (last_user.get("text") if isinstance(last_user, dict) else None)
ensure_case_brief(
case_dir,
domain=args.domain,
question=question,
expected_capability=args.expected_capability,
expected_result_mode=args.expected_result_mode,
)
turn_artifact = build_turn_artifact(
slot=args.slot,
domain=args.domain,
case_id=case_id,
question=question,
session_id=session_id,
conversation=conversation,
session_record=None,
job_record=None,
report_case=None,
export_file_name=f"{args.slot}_output.md",
)
save_capture_bundle(
case_dir=case_dir,
slot=args.slot,
export_markdown=export_text,
debug_payload=last_assistant.get("debug"),
turn_artifact=turn_artifact,
session_record=None,
job_record=None,
report_case=None,
)
print(f"[domain-case-loop] imported {args.slot} artifacts to {case_dir}")
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Repo-native helper for NDC_1C domain-case orchestration")
subparsers = parser.add_subparsers(dest="command", required=True)
run_case = subparsers.add_parser("run-case", help="Run one assistant_stage1 case through the existing backend and save artifacts")
run_case.add_argument("--domain", required=True)
run_case.add_argument("--question", required=True)
run_case.add_argument("--case-id")
run_case.add_argument("--slot", default="baseline", choices=["baseline", "rerun"])
run_case.add_argument("--analysis-date")
run_case.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
run_case.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
run_case.add_argument("--sessions-dir", default=str(DEFAULT_SESSIONS_DIR))
run_case.add_argument("--reports-dir", default=str(DEFAULT_REPORTS_DIR))
run_case.add_argument("--prompt-version", default=DEFAULT_PROMPT_VERSION)
run_case.add_argument("--llm-provider", default=DEFAULT_LLM_PROVIDER, choices=["openai", "local"])
run_case.add_argument("--llm-model", default=DEFAULT_LLM_MODEL)
run_case.add_argument("--llm-base-url", default=DEFAULT_LLM_BASE_URL)
run_case.add_argument("--llm-api-key", default=DEFAULT_LLM_API_KEY)
run_case.add_argument("--temperature", type=float, default=DEFAULT_TEMPERATURE)
run_case.add_argument("--max-output-tokens", type=int, default=DEFAULT_MAX_OUTPUT_TOKENS)
run_case.add_argument("--timeout-seconds", type=int, default=300)
run_case.add_argument("--poll-interval-seconds", type=float, default=1.5)
run_case.add_argument("--expected-capability")
run_case.add_argument("--expected-result-mode")
run_case.add_argument("--use-mock", action="store_true")
run_case.set_defaults(func=handle_run_case)
import_export = subparsers.add_parser("import-export", help="Import an existing technical export markdown and build artifacts")
import_export.add_argument("--domain", required=True)
import_export.add_argument("--input", required=True)
import_export.add_argument("--question")
import_export.add_argument("--case-id")
import_export.add_argument("--slot", default="baseline", choices=["baseline", "rerun"])
import_export.add_argument("--output-root", default=str(DEFAULT_ARTIFACTS_ROOT))
import_export.add_argument("--expected-capability")
import_export.add_argument("--expected-result-mode")
import_export.set_defaults(func=handle_import_export)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
try:
return int(args.func(args))
except Exception as error: # noqa: BLE001
print(f"[domain-case-loop] error: {error}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())