diff --git a/external/1c-mcp-toolkit b/external/1c-mcp-toolkit index 1d20152..b47c3d1 160000 --- a/external/1c-mcp-toolkit +++ b/external/1c-mcp-toolkit @@ -1 +1 @@ -Subproject commit 1d2015214735d2c3d253ba184e0a45efd9ce6c5f +Subproject commit b47c3d124ae17bc4b481d1e2e88c89a7c1418b63 diff --git a/scripts/check_mcp_live_readiness.py b/scripts/check_mcp_live_readiness.py new file mode 100644 index 0000000..721ff35 --- /dev/null +++ b/scripts/check_mcp_live_readiness.py @@ -0,0 +1,280 @@ +from __future__ import annotations + +import argparse +import json +import time +import urllib.error +import urllib.parse +import urllib.request +from pathlib import Path +from typing import Any + + +DEFAULT_BACKEND_URL = "http://127.0.0.1:8787" +DEFAULT_PROXY_URL = "http://127.0.0.1:6003" +DEFAULT_CHANNEL = "default" +DEFAULT_PROBE_TIMEOUT_SECONDS = 190 +DEFAULT_POLL_INTERVAL_SECONDS = 2 + + +def dump_json(payload: Any) -> str: + return json.dumps(payload, ensure_ascii=False, indent=2) + + +def normalize_base_url(value: str) -> str: + return str(value or "").strip().rstrip("/") + + +def request_json( + url: str, + *, + method: str = "GET", + body: dict[str, Any] | None = None, + timeout_seconds: float = 10, +) -> tuple[dict[str, Any] | None, str | None, float]: + payload: bytes | None = None + headers: dict[str, str] = {} + if body is not None: + payload = json.dumps(body, ensure_ascii=False).encode("utf-8") + headers["content-type"] = "application/json; charset=utf-8" + + request = urllib.request.Request(url, data=payload, headers=headers, method=method) + started_at = time.monotonic() + try: + with urllib.request.urlopen(request, timeout=timeout_seconds) as response: + raw = response.read().decode("utf-8-sig") + except urllib.error.HTTPError as error: + elapsed = time.monotonic() - started_at + try: + raw_error = error.read().decode("utf-8-sig") + except Exception: + raw_error = str(error) + return None, f"HTTP {error.code}: {raw_error[:500]}", elapsed + except Exception as error: + elapsed = time.monotonic() - started_at + return None, str(error), elapsed + + elapsed = time.monotonic() - started_at + try: + parsed = json.loads(raw) if raw.strip() else {} + except json.JSONDecodeError as error: + return None, f"Invalid JSON response: {error}", elapsed + if not isinstance(parsed, dict): + return None, "JSON response is not an object", elapsed + return parsed, None, elapsed + + +def write_text(path: Path, text: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(text, encoding="utf-8", newline="\n") + + +def build_verdict(payload: dict[str, Any]) -> dict[str, Any]: + backend_ok = payload.get("backend_health_ok") is True + proxy_ok = payload.get("proxy_health_ok") is True + live_probe = payload.get("live_probe") + live_probe_skipped = payload.get("live_probe_skipped") + + live_probe_ok = None + if isinstance(live_probe, dict): + live_probe_ok = live_probe.get("ok") is True + + if backend_ok and proxy_ok and live_probe_ok is True: + status = "ready" + reason = "backend, proxy, and direct read-only 1C probe returned successfully" + elif backend_ok and proxy_ok and live_probe_skipped: + status = "not_ready" + reason = str(live_probe_skipped) + elif backend_ok and proxy_ok and live_probe is None: + status = "health_only" + reason = "backend and proxy are healthy, but direct 1C evidence was not probed" + elif backend_ok and proxy_ok: + status = "not_ready" + reason = "backend and proxy are healthy, but direct 1C evidence did not return" + else: + status = "not_ready" + reason = "backend or proxy health check failed" + + return { + "status": status, + "reason": reason, + "ready_for_live_replay": status == "ready", + } + + +def proxy_polling_count(proxy_health: Any) -> int | None: + if not isinstance(proxy_health, dict): + return None + raw_polling_count = proxy_health.get("polling_channels_count") + return raw_polling_count if isinstance(raw_polling_count, int) else None + + +def check_readiness( + *, + backend_url: str = DEFAULT_BACKEND_URL, + proxy_url: str = DEFAULT_PROXY_URL, + channel: str = DEFAULT_CHANNEL, + confirm_live: bool = False, + require_polling_before_live: bool = True, + wait_for_polling_seconds: float = 0, + poll_interval_seconds: float = DEFAULT_POLL_INTERVAL_SECONDS, + probe_timeout_seconds: float = DEFAULT_PROBE_TIMEOUT_SECONDS, + probe_limit: int = 1, + request_json_func=request_json, +) -> dict[str, Any]: + backend_url = normalize_base_url(backend_url) + proxy_url = normalize_base_url(proxy_url) + channel = str(channel or DEFAULT_CHANNEL).strip() or DEFAULT_CHANNEL + + result: dict[str, Any] = { + "schema_version": "mcp_live_readiness_check_v1", + "backend_url": backend_url, + "proxy_url": proxy_url, + "channel": channel, + "backend_health_ok": False, + "proxy_health_ok": False, + } + + backend_health, backend_error, backend_elapsed = request_json_func( + f"{backend_url}/api/health", + timeout_seconds=10, + ) + result["backend_health"] = backend_health + result["backend_health_error"] = backend_error + result["backend_health_elapsed_seconds"] = round(backend_elapsed, 3) + result["backend_health_ok"] = isinstance(backend_health, dict) and backend_health.get("ok") is not False + + proxy_health, proxy_error, proxy_elapsed = request_json_func( + f"{proxy_url}/health", + timeout_seconds=10, + ) + result["proxy_health"] = proxy_health + result["proxy_health_error"] = proxy_error + result["proxy_health_elapsed_seconds"] = round(proxy_elapsed, 3) + result["proxy_health_ok"] = isinstance(proxy_health, dict) and str(proxy_health.get("status")) == "healthy" + + should_probe_live = bool(confirm_live) + polling_count = proxy_polling_count(proxy_health) + if ( + should_probe_live + and require_polling_before_live + and polling_count is not None + and polling_count <= 0 + and wait_for_polling_seconds > 0 + ): + wait_started_at = time.monotonic() + wait_attempts = 0 + wait_deadline = wait_started_at + float(wait_for_polling_seconds) + while time.monotonic() < wait_deadline: + time.sleep(max(0, float(poll_interval_seconds))) + wait_attempts += 1 + proxy_health, proxy_error, proxy_elapsed = request_json_func( + f"{proxy_url}/health", + timeout_seconds=10, + ) + result["proxy_health"] = proxy_health + result["proxy_health_error"] = proxy_error + result["proxy_health_elapsed_seconds"] = round(proxy_elapsed, 3) + result["proxy_health_ok"] = isinstance(proxy_health, dict) and str(proxy_health.get("status")) == "healthy" + polling_count = proxy_polling_count(proxy_health) + if polling_count is not None and polling_count > 0: + break + result["poll_wait"] = { + "requested_seconds": round(float(wait_for_polling_seconds), 3), + "elapsed_seconds": round(time.monotonic() - wait_started_at, 3), + "attempts": wait_attempts, + "observed_polling": polling_count is not None and polling_count > 0, + } + + if should_probe_live and require_polling_before_live and polling_count is not None and polling_count <= 0: + should_probe_live = False + waited = result.get("poll_wait") + suffix = "" + if isinstance(waited, dict) and waited.get("attempts"): + suffix = f" after waiting {waited.get('elapsed_seconds')}s" + result["live_probe_skipped"] = ( + "proxy is healthy, but no /1c/poll activity from a 1C client has been observed" + suffix + ) + + if should_probe_live: + query = urllib.parse.urlencode({"channel": channel}) + probe_url = f"{proxy_url}/api/get_metadata?{query}" + probe_body = {"limit": max(1, int(probe_limit))} + probe_payload, probe_error, probe_elapsed = request_json_func( + probe_url, + method="POST", + body=probe_body, + timeout_seconds=max(1, float(probe_timeout_seconds)), + ) + result["live_probe"] = { + "kind": "get_metadata", + "ok": isinstance(probe_payload, dict) and probe_payload.get("success") is True, + "elapsed_seconds": round(probe_elapsed, 3), + "response": probe_payload, + "error": probe_error, + } + + result["verdict"] = build_verdict(result) + return result + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Check whether the NDC 1C backend/proxy and optional live 1C evidence probe are ready." + ) + parser.add_argument("--backend-url", default=DEFAULT_BACKEND_URL) + parser.add_argument("--proxy-url", default=DEFAULT_PROXY_URL) + parser.add_argument("--channel", default=DEFAULT_CHANNEL) + parser.add_argument( + "--confirm-live", + action="store_true", + help="Run a direct read-only get_metadata probe through the proxy. This can take up to the proxy timeout.", + ) + parser.add_argument("--probe-timeout-seconds", type=float, default=DEFAULT_PROBE_TIMEOUT_SECONDS) + parser.add_argument("--probe-limit", type=int, default=1) + parser.add_argument( + "--wait-for-polling-seconds", + type=float, + default=0, + help="When --confirm-live is set, wait this long for proxy health to observe /1c/poll before probing.", + ) + parser.add_argument( + "--poll-interval-seconds", + type=float, + default=DEFAULT_POLL_INTERVAL_SECONDS, + help="Polling interval used with --wait-for-polling-seconds.", + ) + parser.add_argument( + "--no-require-polling-before-live", + action="store_true", + help="Run the live probe even when proxy health has not observed any /1c/poll activity.", + ) + parser.add_argument("--output-json", type=Path) + args = parser.parse_args() + + result = check_readiness( + backend_url=args.backend_url, + proxy_url=args.proxy_url, + channel=args.channel, + confirm_live=bool(args.confirm_live), + require_polling_before_live=not bool(args.no_require_polling_before_live), + wait_for_polling_seconds=float(args.wait_for_polling_seconds), + poll_interval_seconds=float(args.poll_interval_seconds), + probe_timeout_seconds=float(args.probe_timeout_seconds), + probe_limit=int(args.probe_limit), + ) + + output = dump_json(result) + "\n" + if args.output_json: + write_text(args.output_json, output) + + print(output, end="") + if result["verdict"]["ready_for_live_replay"]: + return 0 + if not args.confirm_live and result["backend_health_ok"] and result["proxy_health_ok"]: + return 0 + return 2 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/domain_truth_harness.py b/scripts/domain_truth_harness.py index 12a705e..c468b54 100644 --- a/scripts/domain_truth_harness.py +++ b/scripts/domain_truth_harness.py @@ -9,6 +9,7 @@ from types import SimpleNamespace from typing import Any import domain_case_loop as dcl +import check_mcp_live_readiness as mcp_readiness import scenario_acceptance_policy as sap @@ -1137,6 +1138,29 @@ def handle_run_live(args: argparse.Namespace) -> int: output_dir = Path(args.output_dir).resolve() if args.output_dir else default_output_dir( f"{spec['scenario_id']}_live" ) + if args.require_mcp_live_readiness: + output_dir.mkdir(parents=True, exist_ok=True) + readiness = mcp_readiness.check_readiness( + backend_url=args.backend_url, + proxy_url=args.mcp_proxy_url, + channel=args.mcp_channel, + confirm_live=True, + require_polling_before_live=not bool(args.mcp_live_probe_without_observed_polling), + wait_for_polling_seconds=float(args.mcp_wait_for_polling_seconds), + poll_interval_seconds=float(args.mcp_poll_interval_seconds), + probe_timeout_seconds=float(args.mcp_readiness_probe_timeout_seconds), + probe_limit=int(args.mcp_readiness_probe_limit), + ) + write_json(output_dir / "mcp_live_readiness.json", readiness) + print( + "[truth-harness] mcp-live-readiness " + f"status={readiness['verdict']['status']} " + f"ready={readiness['verdict']['ready_for_live_replay']} " + f"reason={readiness['verdict']['reason']}" + ) + if not readiness["verdict"]["ready_for_live_replay"]: + print(f"[truth-harness] run-live skipped before first step; artifacts={output_dir}") + return 2 result = run_live(spec, output_dir, args) print(f"[truth-harness] run-live overall_status={result['review_summary']['overall_status']}") print(f"[truth-harness] run-live final_status={result['pack_state']['final_status']}") @@ -1182,6 +1206,35 @@ def build_parser() -> argparse.ArgumentParser: run_live_cmd.add_argument("--max-output-tokens", type=int, default=dcl.DEFAULT_MAX_OUTPUT_TOKENS) run_live_cmd.add_argument("--timeout-seconds", type=int, default=120) run_live_cmd.add_argument("--use-mock", action="store_true") + run_live_cmd.add_argument( + "--require-mcp-live-readiness", + action="store_true", + help="Run a backend/proxy/live-1C readiness gate before sending the first assistant turn.", + ) + run_live_cmd.add_argument("--mcp-proxy-url", default=mcp_readiness.DEFAULT_PROXY_URL) + run_live_cmd.add_argument("--mcp-channel", default=mcp_readiness.DEFAULT_CHANNEL) + run_live_cmd.add_argument( + "--mcp-readiness-probe-timeout-seconds", + type=float, + default=mcp_readiness.DEFAULT_PROBE_TIMEOUT_SECONDS, + ) + run_live_cmd.add_argument("--mcp-readiness-probe-limit", type=int, default=1) + run_live_cmd.add_argument( + "--mcp-wait-for-polling-seconds", + type=float, + default=0, + help="Wait for proxy health to observe /1c/poll activity before the live readiness probe.", + ) + run_live_cmd.add_argument( + "--mcp-poll-interval-seconds", + type=float, + default=mcp_readiness.DEFAULT_POLL_INTERVAL_SECONDS, + ) + run_live_cmd.add_argument( + "--mcp-live-probe-without-observed-polling", + action="store_true", + help="Allow the preflight live probe even when proxy health has not observed /1c/poll activity.", + ) run_live_cmd.set_defaults(func=handle_run_live) return parser diff --git a/scripts/test_mcp_live_readiness.py b/scripts/test_mcp_live_readiness.py new file mode 100644 index 0000000..c0a1b4f --- /dev/null +++ b/scripts/test_mcp_live_readiness.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import sys +import unittest +from pathlib import Path +from typing import Any + + +sys.path.insert(0, str(Path(__file__).resolve().parent)) + +import check_mcp_live_readiness as readiness + + +class McpLiveReadinessTests(unittest.TestCase): + def test_confirm_live_skips_probe_when_proxy_reports_no_polling(self) -> None: + calls: list[tuple[str, str]] = [] + + def fake_request_json(url: str, **kwargs: Any) -> tuple[dict[str, Any] | None, str | None, float]: + calls.append((url, str(kwargs.get("method") or "GET"))) + if url.endswith("/api/health"): + return {"ok": True}, None, 0.01 + if url.endswith("/health"): + return {"status": "healthy", "polling_channels_count": 0}, None, 0.01 + return {"success": True}, None, 0.01 + + result = readiness.check_readiness(confirm_live=True, request_json_func=fake_request_json) + + self.assertEqual(result["verdict"]["status"], "not_ready") + self.assertFalse(result["verdict"]["ready_for_live_replay"]) + self.assertIn("/1c/poll", result["verdict"]["reason"]) + self.assertNotIn(("http://127.0.0.1:6003/api/get_metadata?channel=default", "POST"), calls) + + def test_confirm_live_allows_probe_when_polling_was_observed(self) -> None: + calls: list[tuple[str, str]] = [] + + def fake_request_json(url: str, **kwargs: Any) -> tuple[dict[str, Any] | None, str | None, float]: + calls.append((url, str(kwargs.get("method") or "GET"))) + if url.endswith("/api/health"): + return {"ok": True}, None, 0.01 + if url.endswith("/health"): + return {"status": "healthy", "polling_channels_count": 1}, None, 0.01 + return {"success": True, "data": []}, None, 0.02 + + result = readiness.check_readiness(confirm_live=True, request_json_func=fake_request_json) + + self.assertEqual(result["verdict"]["status"], "ready") + self.assertTrue(result["verdict"]["ready_for_live_replay"]) + self.assertIn(("http://127.0.0.1:6003/api/get_metadata?channel=default", "POST"), calls) + + def test_confirm_live_waits_for_polling_before_probe(self) -> None: + proxy_health_calls = 0 + + def fake_request_json(url: str, **kwargs: Any) -> tuple[dict[str, Any] | None, str | None, float]: + nonlocal proxy_health_calls + if url.endswith("/api/health"): + return {"ok": True}, None, 0.01 + if url.endswith("/health"): + proxy_health_calls += 1 + return {"status": "healthy", "polling_channels_count": 1 if proxy_health_calls >= 2 else 0}, None, 0.01 + return {"success": True, "data": []}, None, 0.02 + + result = readiness.check_readiness( + confirm_live=True, + wait_for_polling_seconds=0.1, + poll_interval_seconds=0, + request_json_func=fake_request_json, + ) + + self.assertEqual(result["verdict"]["status"], "ready") + self.assertTrue(result["poll_wait"]["observed_polling"]) + self.assertGreaterEqual(proxy_health_calls, 2) + + +if __name__ == "__main__": + unittest.main()