from __future__ import annotations from dataclasses import dataclass from pathlib import Path import os from dotenv import load_dotenv PROJECT_ROOT = Path(__file__).resolve().parents[1] LOGS_DIR = PROJECT_ROOT / "logs" DATA_DIR = PROJECT_ROOT / "data" def _as_bool(raw: str | None, default: bool) -> bool: if raw is None: return default normalized = raw.strip().lower() return normalized in {"1", "true", "yes", "on"} def _normalize_odata_path(path: str) -> str: stripped = path.strip() if not stripped: return "/odata/standard.odata/" if not stripped.startswith("/"): stripped = "/" + stripped if not stripped.endswith("/"): stripped = stripped + "/" return stripped def _default_canonical_db_url() -> str: db_path = DATA_DIR / "canonical_store.db" return f"sqlite:///{db_path.as_posix()}" @dataclass(frozen=True) class OneCSettings: base_url: str infobase: str username: str password: str odata_path: str timeout: int verify_tls: bool probe_top: int probe_entity_sets: tuple[str, ...] canonical_db_url: str refresh_default_limit_per_set: int refresh_default_entity_keywords: tuple[str, ...] feature_default_baseline_window_hours: int anomaly_stale_refresh_threshold_hours: int feature_entity_scan_limit: int risk_medium_threshold: float risk_high_threshold: float risk_anomaly_scan_limit: int @property def service_root(self) -> str: base = self.base_url.rstrip("/") infobase = self.infobase.strip().strip("/") if infobase: return f"{base}/{infobase}{self.odata_path}" return f"{base}{self.odata_path}" @property def metadata_url(self) -> str: return f"{self.service_root}$metadata" def load_settings() -> OneCSettings: env_file = PROJECT_ROOT / ".env" if env_file.exists(): load_dotenv(env_file) base_url = os.getenv("ONEC_BASE_URL", "http://localhost").strip() infobase = os.getenv("ONEC_INFOBASE", "AccountingBase").strip() username = os.getenv("ONEC_USERNAME", "").strip() password = os.getenv("ONEC_PASSWORD", "") odata_path = _normalize_odata_path(os.getenv("ONEC_ODATA_PATH", "/odata/standard.odata/")) timeout = int(os.getenv("ONEC_TIMEOUT", "30").strip()) verify_tls = _as_bool(os.getenv("ONEC_VERIFY_TLS"), default=False) probe_top = int(os.getenv("ONEC_PROBE_TOP", "5").strip()) probe_entity_sets_raw = os.getenv("ONEC_PROBE_ENTITY_SETS", "") probe_entity_sets = tuple( item.strip() for item in probe_entity_sets_raw.split(",") if item.strip() ) canonical_db_url = os.getenv("CANONICAL_DB_URL", _default_canonical_db_url()).strip() refresh_default_limit_per_set = int(os.getenv("REFRESH_DEFAULT_LIMIT_PER_SET", "200").strip()) refresh_default_keywords_raw = os.getenv( "REFRESH_DEFAULT_ENTITY_KEYWORDS", "document,posting,movement,register,account,counterparty,contract,organization,subconto,item,warehouse", ) refresh_default_entity_keywords = tuple( item.strip().lower() for item in refresh_default_keywords_raw.split(",") if item.strip() ) feature_default_baseline_window_hours = int(os.getenv("FEATURE_BASELINE_WINDOW_HOURS", "24").strip()) anomaly_stale_refresh_threshold_hours = int(os.getenv("ANOMALY_STALE_REFRESH_THRESHOLD_HOURS", "6").strip()) feature_entity_scan_limit = int(os.getenv("FEATURE_ENTITY_SCAN_LIMIT", "200000").strip()) risk_medium_threshold = float(os.getenv("RISK_MEDIUM_THRESHOLD", "0.45").strip()) risk_high_threshold = float(os.getenv("RISK_HIGH_THRESHOLD", "0.75").strip()) risk_anomaly_scan_limit = int(os.getenv("RISK_ANOMALY_SCAN_LIMIT", "5000").strip()) return OneCSettings( base_url=base_url, infobase=infobase, username=username, password=password, odata_path=odata_path, timeout=timeout, verify_tls=verify_tls, probe_top=probe_top, probe_entity_sets=probe_entity_sets, canonical_db_url=canonical_db_url, refresh_default_limit_per_set=refresh_default_limit_per_set, refresh_default_entity_keywords=refresh_default_entity_keywords, feature_default_baseline_window_hours=feature_default_baseline_window_hours, anomaly_stale_refresh_threshold_hours=anomaly_stale_refresh_threshold_hours, feature_entity_scan_limit=feature_entity_scan_limit, risk_medium_threshold=risk_medium_threshold, risk_high_threshold=risk_high_threshold, risk_anomaly_scan_limit=risk_anomaly_scan_limit, )