from __future__ import annotations from datetime import datetime, timezone from typing import Any from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel, Field from canonical_layer.features import FeatureService from canonical_layer.refresh import REFRESH_MODES, RefreshService from canonical_layer.risk import RiskService from canonical_layer.service import CanonicalService app = FastAPI( title="1C Canonical Layer MVP", version="0.1.0", description="Read-only API over 1C OData probe and canonical mapping layer.", ) service = CanonicalService.build() refresh_service = RefreshService.build() feature_service = FeatureService.build() risk_service = RiskService.build() class RefreshRunRequest(BaseModel): mode: str = Field(default="incremental") date_from: str | None = None date_to: str | None = None target_id: str | None = None limit_per_set: int | None = Field(default=None, ge=1, le=5000) entity_sets: list[str] | None = None keywords: list[str] | None = None class FeatureRunRequest(BaseModel): baseline_window_hours: int | None = Field(default=None, ge=1, le=720) stale_refresh_threshold_hours: int | None = Field(default=None, ge=1, le=720) top_account_tokens: int = Field(default=20, ge=1, le=200) entity_limit: int | None = Field(default=None, ge=1, le=200000) class RiskRunRequest(BaseModel): source_feature_run_id: str | None = None anomaly_limit: int | None = Field(default=None, ge=1, le=20000) @app.get("/health") def health() -> dict[str, str]: return { "status": "ok", "timestamp": datetime.now(timezone.utc).isoformat(), } @app.get("/metadata/entity-sets") def metadata_entity_sets() -> dict[str, Any]: return service.list_entity_sets() @app.get("/documents") def list_documents( from_date: str | None = Query(default=None, alias="from"), to_date: str | None = Query(default=None, alias="to"), limit: int = Query(default=100, ge=1, le=500), ) -> dict[str, Any]: items = service.get_documents(date_from=from_date, date_to=to_date, limit=limit) return { "total": len(items), "items": [item.model_dump() for item in items], } @app.get("/documents/{document_id}") def get_document(document_id: str) -> dict[str, Any]: document = service.get_document(document_id) if document is None: raise HTTPException(status_code=404, detail="Document not found in current probe window") return document.model_dump() @app.get("/postings") def list_postings( account: str | None = Query(default=None), from_date: str | None = Query(default=None, alias="from"), to_date: str | None = Query(default=None, alias="to"), limit: int = Query(default=100, ge=1, le=500), ) -> dict[str, Any]: items = service.get_postings(account=account, date_from=from_date, date_to=to_date, limit=limit) return { "total": len(items), "items": [item.model_dump() for item in items], } @app.get("/counterparties/{counterparty_id}/documents") def counterparty_documents( counterparty_id: str, limit: int = Query(default=100, ge=1, le=500), ) -> dict[str, Any]: items = service.get_counterparty_documents(counterparty_id=counterparty_id, limit=limit) return { "counterparty_id": counterparty_id, "total": len(items), "items": [item.model_dump() for item in items], } @app.get("/graph/document/{document_id}") def document_graph(document_id: str) -> dict[str, Any]: return service.build_document_graph(document_id) @app.get("/store/stats") def store_stats() -> dict[str, Any]: return refresh_service.store_stats() @app.get("/refresh/runs") def refresh_runs(limit: int = Query(default=20, ge=1, le=200)) -> dict[str, Any]: runs = refresh_service.list_recent_runs(limit=limit) return { "total": len(runs), "items": runs, } @app.post("/refresh/run") def run_refresh(request: RefreshRunRequest) -> dict[str, Any]: mode = request.mode.strip().lower() if mode not in REFRESH_MODES: raise HTTPException(status_code=400, detail=f"Unsupported mode '{request.mode}'") result = refresh_service.run_refresh( mode=mode, date_from=request.date_from, date_to=request.date_to, target_id=request.target_id, limit_per_set=request.limit_per_set, requested_entity_sets=request.entity_sets, entity_keywords=request.keywords, ) payload = result.to_dict() payload["store_stats"] = refresh_service.store_stats() return payload @app.get("/features/stats") def feature_stats() -> dict[str, Any]: return feature_service.stats() @app.get("/features/runs") def feature_runs(limit: int = Query(default=20, ge=1, le=200)) -> dict[str, Any]: runs = feature_service.list_recent_runs(limit=limit) return { "total": len(runs), "items": runs, } @app.get("/features/metrics") def feature_metrics( limit: int = Query(default=200, ge=1, le=2000), metric_key: str | None = Query(default=None), scope: str | None = Query(default=None), run_id: str | None = Query(default=None), ) -> dict[str, Any]: items = feature_service.list_metrics( limit=limit, metric_key=metric_key, scope=scope, run_id=run_id, ) return { "total": len(items), "items": items, } @app.get("/features/anomalies") def feature_anomalies( limit: int = Query(default=200, ge=1, le=2000), severity: str | None = Query(default=None), active_only: bool = Query(default=True), run_id: str | None = Query(default=None), ) -> dict[str, Any]: items = feature_service.list_anomalies( limit=limit, severity=severity, active_only=active_only, run_id=run_id, ) return { "total": len(items), "items": items, } @app.post("/features/run") def run_features(request: FeatureRunRequest) -> dict[str, Any]: result = feature_service.run_feature_engine( baseline_window_hours=request.baseline_window_hours, stale_refresh_threshold_hours=request.stale_refresh_threshold_hours, top_account_tokens=request.top_account_tokens, entity_limit=request.entity_limit, ) payload = result.to_dict() payload["feature_store_stats"] = feature_service.stats() payload["store_stats"] = refresh_service.store_stats() return payload @app.get("/risk/stats") def risk_stats() -> dict[str, Any]: return risk_service.stats() @app.get("/risk/runs") def risk_runs(limit: int = Query(default=20, ge=1, le=200)) -> dict[str, Any]: runs = risk_service.list_recent_runs(limit=limit) return { "total": len(runs), "items": runs, } @app.get("/risk/patterns") def risk_patterns( limit: int = Query(default=200, ge=1, le=2000), severity: str | None = Query(default=None), active_only: bool = Query(default=True), run_id: str | None = Query(default=None), pattern_key: str | None = Query(default=None), scope: str | None = Query(default=None), ) -> dict[str, Any]: items = risk_service.list_patterns( limit=limit, severity=severity, active_only=active_only, run_id=run_id, pattern_key=pattern_key, scope=scope, ) return { "total": len(items), "items": items, } @app.post("/risk/run") def run_risk(request: RiskRunRequest) -> dict[str, Any]: result = risk_service.run_risk_engine( source_feature_run_id=request.source_feature_run_id, anomaly_limit=request.anomaly_limit, ) payload = result.to_dict() payload["risk_store_stats"] = risk_service.stats() payload["feature_store_stats"] = feature_service.stats() payload["store_stats"] = refresh_service.store_stats() return payload