NODEDC_1C/scripts/run_risk.py

69 lines
1.9 KiB
Python

from __future__ import annotations
import argparse
import json
from pathlib import Path
import sys
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from canonical_layer.risk import RiskService
from config.settings import LOGS_DIR
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run risk scoring engine from feature/anomaly layer",
)
parser.add_argument(
"--source-feature-run-id",
default=None,
help="Optional explicit feature run id to score",
)
parser.add_argument(
"--anomaly-limit",
type=int,
default=None,
help="How many anomalies to scan",
)
parser.add_argument(
"--output",
default=str(LOGS_DIR / "risk_last_run.json"),
help="Where to write run summary json",
)
parser.add_argument(
"--strict",
action="store_true",
help="Exit with code 1 if run status is not success",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
service = RiskService.build()
result = service.run_risk_engine(
source_feature_run_id=args.source_feature_run_id,
anomaly_limit=args.anomaly_limit,
)
payload = result.to_dict()
payload["risk_store_stats"] = service.stats()
payload["risk_runs"] = service.list_recent_runs(limit=5)
payload["risk_patterns"] = service.list_patterns(limit=200, active_only=True)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(payload, ensure_ascii=False, indent=2))
if args.strict and result.status != "success":
return 1
return 0
if __name__ == "__main__":
sys.exit(main())