281 lines
10 KiB
Python
281 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
DEFAULT_BACKEND_URL = "http://127.0.0.1:8787"
|
|
DEFAULT_PROXY_URL = "http://127.0.0.1:6003"
|
|
DEFAULT_CHANNEL = "default"
|
|
DEFAULT_PROBE_TIMEOUT_SECONDS = 190
|
|
DEFAULT_POLL_INTERVAL_SECONDS = 2
|
|
|
|
|
|
def dump_json(payload: Any) -> str:
|
|
return json.dumps(payload, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def normalize_base_url(value: str) -> str:
|
|
return str(value or "").strip().rstrip("/")
|
|
|
|
|
|
def request_json(
|
|
url: str,
|
|
*,
|
|
method: str = "GET",
|
|
body: dict[str, Any] | None = None,
|
|
timeout_seconds: float = 10,
|
|
) -> tuple[dict[str, Any] | None, str | None, float]:
|
|
payload: bytes | None = None
|
|
headers: dict[str, str] = {}
|
|
if body is not None:
|
|
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
|
|
headers["content-type"] = "application/json; charset=utf-8"
|
|
|
|
request = urllib.request.Request(url, data=payload, headers=headers, method=method)
|
|
started_at = time.monotonic()
|
|
try:
|
|
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
|
|
raw = response.read().decode("utf-8-sig")
|
|
except urllib.error.HTTPError as error:
|
|
elapsed = time.monotonic() - started_at
|
|
try:
|
|
raw_error = error.read().decode("utf-8-sig")
|
|
except Exception:
|
|
raw_error = str(error)
|
|
return None, f"HTTP {error.code}: {raw_error[:500]}", elapsed
|
|
except Exception as error:
|
|
elapsed = time.monotonic() - started_at
|
|
return None, str(error), elapsed
|
|
|
|
elapsed = time.monotonic() - started_at
|
|
try:
|
|
parsed = json.loads(raw) if raw.strip() else {}
|
|
except json.JSONDecodeError as error:
|
|
return None, f"Invalid JSON response: {error}", elapsed
|
|
if not isinstance(parsed, dict):
|
|
return None, "JSON response is not an object", elapsed
|
|
return parsed, None, elapsed
|
|
|
|
|
|
def write_text(path: Path, text: str) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(text, encoding="utf-8", newline="\n")
|
|
|
|
|
|
def build_verdict(payload: dict[str, Any]) -> dict[str, Any]:
|
|
backend_ok = payload.get("backend_health_ok") is True
|
|
proxy_ok = payload.get("proxy_health_ok") is True
|
|
live_probe = payload.get("live_probe")
|
|
live_probe_skipped = payload.get("live_probe_skipped")
|
|
|
|
live_probe_ok = None
|
|
if isinstance(live_probe, dict):
|
|
live_probe_ok = live_probe.get("ok") is True
|
|
|
|
if backend_ok and proxy_ok and live_probe_ok is True:
|
|
status = "ready"
|
|
reason = "backend, proxy, and direct read-only 1C probe returned successfully"
|
|
elif backend_ok and proxy_ok and live_probe_skipped:
|
|
status = "not_ready"
|
|
reason = str(live_probe_skipped)
|
|
elif backend_ok and proxy_ok and live_probe is None:
|
|
status = "health_only"
|
|
reason = "backend and proxy are healthy, but direct 1C evidence was not probed"
|
|
elif backend_ok and proxy_ok:
|
|
status = "not_ready"
|
|
reason = "backend and proxy are healthy, but direct 1C evidence did not return"
|
|
else:
|
|
status = "not_ready"
|
|
reason = "backend or proxy health check failed"
|
|
|
|
return {
|
|
"status": status,
|
|
"reason": reason,
|
|
"ready_for_live_replay": status == "ready",
|
|
}
|
|
|
|
|
|
def proxy_polling_count(proxy_health: Any) -> int | None:
|
|
if not isinstance(proxy_health, dict):
|
|
return None
|
|
raw_polling_count = proxy_health.get("polling_channels_count")
|
|
return raw_polling_count if isinstance(raw_polling_count, int) else None
|
|
|
|
|
|
def check_readiness(
|
|
*,
|
|
backend_url: str = DEFAULT_BACKEND_URL,
|
|
proxy_url: str = DEFAULT_PROXY_URL,
|
|
channel: str = DEFAULT_CHANNEL,
|
|
confirm_live: bool = False,
|
|
require_polling_before_live: bool = True,
|
|
wait_for_polling_seconds: float = 0,
|
|
poll_interval_seconds: float = DEFAULT_POLL_INTERVAL_SECONDS,
|
|
probe_timeout_seconds: float = DEFAULT_PROBE_TIMEOUT_SECONDS,
|
|
probe_limit: int = 1,
|
|
request_json_func=request_json,
|
|
) -> dict[str, Any]:
|
|
backend_url = normalize_base_url(backend_url)
|
|
proxy_url = normalize_base_url(proxy_url)
|
|
channel = str(channel or DEFAULT_CHANNEL).strip() or DEFAULT_CHANNEL
|
|
|
|
result: dict[str, Any] = {
|
|
"schema_version": "mcp_live_readiness_check_v1",
|
|
"backend_url": backend_url,
|
|
"proxy_url": proxy_url,
|
|
"channel": channel,
|
|
"backend_health_ok": False,
|
|
"proxy_health_ok": False,
|
|
}
|
|
|
|
backend_health, backend_error, backend_elapsed = request_json_func(
|
|
f"{backend_url}/api/health",
|
|
timeout_seconds=10,
|
|
)
|
|
result["backend_health"] = backend_health
|
|
result["backend_health_error"] = backend_error
|
|
result["backend_health_elapsed_seconds"] = round(backend_elapsed, 3)
|
|
result["backend_health_ok"] = isinstance(backend_health, dict) and backend_health.get("ok") is not False
|
|
|
|
proxy_health, proxy_error, proxy_elapsed = request_json_func(
|
|
f"{proxy_url}/health",
|
|
timeout_seconds=10,
|
|
)
|
|
result["proxy_health"] = proxy_health
|
|
result["proxy_health_error"] = proxy_error
|
|
result["proxy_health_elapsed_seconds"] = round(proxy_elapsed, 3)
|
|
result["proxy_health_ok"] = isinstance(proxy_health, dict) and str(proxy_health.get("status")) == "healthy"
|
|
|
|
should_probe_live = bool(confirm_live)
|
|
polling_count = proxy_polling_count(proxy_health)
|
|
if (
|
|
should_probe_live
|
|
and require_polling_before_live
|
|
and polling_count is not None
|
|
and polling_count <= 0
|
|
and wait_for_polling_seconds > 0
|
|
):
|
|
wait_started_at = time.monotonic()
|
|
wait_attempts = 0
|
|
wait_deadline = wait_started_at + float(wait_for_polling_seconds)
|
|
while time.monotonic() < wait_deadline:
|
|
time.sleep(max(0, float(poll_interval_seconds)))
|
|
wait_attempts += 1
|
|
proxy_health, proxy_error, proxy_elapsed = request_json_func(
|
|
f"{proxy_url}/health",
|
|
timeout_seconds=10,
|
|
)
|
|
result["proxy_health"] = proxy_health
|
|
result["proxy_health_error"] = proxy_error
|
|
result["proxy_health_elapsed_seconds"] = round(proxy_elapsed, 3)
|
|
result["proxy_health_ok"] = isinstance(proxy_health, dict) and str(proxy_health.get("status")) == "healthy"
|
|
polling_count = proxy_polling_count(proxy_health)
|
|
if polling_count is not None and polling_count > 0:
|
|
break
|
|
result["poll_wait"] = {
|
|
"requested_seconds": round(float(wait_for_polling_seconds), 3),
|
|
"elapsed_seconds": round(time.monotonic() - wait_started_at, 3),
|
|
"attempts": wait_attempts,
|
|
"observed_polling": polling_count is not None and polling_count > 0,
|
|
}
|
|
|
|
if should_probe_live and require_polling_before_live and polling_count is not None and polling_count <= 0:
|
|
should_probe_live = False
|
|
waited = result.get("poll_wait")
|
|
suffix = ""
|
|
if isinstance(waited, dict) and waited.get("attempts"):
|
|
suffix = f" after waiting {waited.get('elapsed_seconds')}s"
|
|
result["live_probe_skipped"] = (
|
|
"proxy is healthy, but no /1c/poll activity from a 1C client has been observed" + suffix
|
|
)
|
|
|
|
if should_probe_live:
|
|
query = urllib.parse.urlencode({"channel": channel})
|
|
probe_url = f"{proxy_url}/api/get_metadata?{query}"
|
|
probe_body = {"limit": max(1, int(probe_limit))}
|
|
probe_payload, probe_error, probe_elapsed = request_json_func(
|
|
probe_url,
|
|
method="POST",
|
|
body=probe_body,
|
|
timeout_seconds=max(1, float(probe_timeout_seconds)),
|
|
)
|
|
result["live_probe"] = {
|
|
"kind": "get_metadata",
|
|
"ok": isinstance(probe_payload, dict) and probe_payload.get("success") is True,
|
|
"elapsed_seconds": round(probe_elapsed, 3),
|
|
"response": probe_payload,
|
|
"error": probe_error,
|
|
}
|
|
|
|
result["verdict"] = build_verdict(result)
|
|
return result
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Check whether the NDC 1C backend/proxy and optional live 1C evidence probe are ready."
|
|
)
|
|
parser.add_argument("--backend-url", default=DEFAULT_BACKEND_URL)
|
|
parser.add_argument("--proxy-url", default=DEFAULT_PROXY_URL)
|
|
parser.add_argument("--channel", default=DEFAULT_CHANNEL)
|
|
parser.add_argument(
|
|
"--confirm-live",
|
|
action="store_true",
|
|
help="Run a direct read-only get_metadata probe through the proxy. This can take up to the proxy timeout.",
|
|
)
|
|
parser.add_argument("--probe-timeout-seconds", type=float, default=DEFAULT_PROBE_TIMEOUT_SECONDS)
|
|
parser.add_argument("--probe-limit", type=int, default=1)
|
|
parser.add_argument(
|
|
"--wait-for-polling-seconds",
|
|
type=float,
|
|
default=0,
|
|
help="When --confirm-live is set, wait this long for proxy health to observe /1c/poll before probing.",
|
|
)
|
|
parser.add_argument(
|
|
"--poll-interval-seconds",
|
|
type=float,
|
|
default=DEFAULT_POLL_INTERVAL_SECONDS,
|
|
help="Polling interval used with --wait-for-polling-seconds.",
|
|
)
|
|
parser.add_argument(
|
|
"--no-require-polling-before-live",
|
|
action="store_true",
|
|
help="Run the live probe even when proxy health has not observed any /1c/poll activity.",
|
|
)
|
|
parser.add_argument("--output-json", type=Path)
|
|
args = parser.parse_args()
|
|
|
|
result = check_readiness(
|
|
backend_url=args.backend_url,
|
|
proxy_url=args.proxy_url,
|
|
channel=args.channel,
|
|
confirm_live=bool(args.confirm_live),
|
|
require_polling_before_live=not bool(args.no_require_polling_before_live),
|
|
wait_for_polling_seconds=float(args.wait_for_polling_seconds),
|
|
poll_interval_seconds=float(args.poll_interval_seconds),
|
|
probe_timeout_seconds=float(args.probe_timeout_seconds),
|
|
probe_limit=int(args.probe_limit),
|
|
)
|
|
|
|
output = dump_json(result) + "\n"
|
|
if args.output_json:
|
|
write_text(args.output_json, output)
|
|
|
|
print(output, end="")
|
|
if result["verdict"]["ready_for_live_replay"]:
|
|
return 0
|
|
if not args.confirm_live and result["backend_health_ok"] and result["proxy_health_ok"]:
|
|
return 0
|
|
return 2
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|