NODEDC_1C/config/settings.py

130 lines
4.6 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import os
from dotenv import load_dotenv
PROJECT_ROOT = Path(__file__).resolve().parents[1]
LOGS_DIR = PROJECT_ROOT / "logs"
DATA_DIR = PROJECT_ROOT / "data"
def _as_bool(raw: str | None, default: bool) -> bool:
if raw is None:
return default
normalized = raw.strip().lower()
return normalized in {"1", "true", "yes", "on"}
def _normalize_odata_path(path: str) -> str:
stripped = path.strip()
if not stripped:
return "/odata/standard.odata/"
if not stripped.startswith("/"):
stripped = "/" + stripped
if not stripped.endswith("/"):
stripped = stripped + "/"
return stripped
def _default_canonical_db_url() -> str:
db_path = DATA_DIR / "canonical_store.db"
return f"sqlite:///{db_path.as_posix()}"
@dataclass(frozen=True)
class OneCSettings:
base_url: str
infobase: str
username: str
password: str
odata_path: str
timeout: int
verify_tls: bool
probe_top: int
probe_entity_sets: tuple[str, ...]
canonical_db_url: str
refresh_default_limit_per_set: int
refresh_default_entity_keywords: tuple[str, ...]
feature_default_baseline_window_hours: int
anomaly_stale_refresh_threshold_hours: int
feature_entity_scan_limit: int
risk_medium_threshold: float
risk_high_threshold: float
risk_anomaly_scan_limit: int
@property
def service_root(self) -> str:
base = self.base_url.rstrip("/")
infobase = self.infobase.strip().strip("/")
if infobase:
return f"{base}/{infobase}{self.odata_path}"
return f"{base}{self.odata_path}"
@property
def metadata_url(self) -> str:
return f"{self.service_root}$metadata"
def load_settings() -> OneCSettings:
env_file = PROJECT_ROOT / ".env"
if env_file.exists():
load_dotenv(env_file)
base_url = os.getenv("ONEC_BASE_URL", "http://localhost").strip()
infobase = os.getenv("ONEC_INFOBASE", "AccountingBase").strip()
username = os.getenv("ONEC_USERNAME", "").strip()
password = os.getenv("ONEC_PASSWORD", "")
odata_path = _normalize_odata_path(os.getenv("ONEC_ODATA_PATH", "/odata/standard.odata/"))
timeout = int(os.getenv("ONEC_TIMEOUT", "30").strip())
verify_tls = _as_bool(os.getenv("ONEC_VERIFY_TLS"), default=False)
probe_top = int(os.getenv("ONEC_PROBE_TOP", "5").strip())
probe_entity_sets_raw = os.getenv("ONEC_PROBE_ENTITY_SETS", "")
probe_entity_sets = tuple(
item.strip()
for item in probe_entity_sets_raw.split(",")
if item.strip()
)
canonical_db_url = os.getenv("CANONICAL_DB_URL", _default_canonical_db_url()).strip()
refresh_default_limit_per_set = int(os.getenv("REFRESH_DEFAULT_LIMIT_PER_SET", "200").strip())
refresh_default_keywords_raw = os.getenv(
"REFRESH_DEFAULT_ENTITY_KEYWORDS",
"document,posting,movement,register,account,counterparty,contract,organization,subconto,item,warehouse",
)
refresh_default_entity_keywords = tuple(
item.strip().lower()
for item in refresh_default_keywords_raw.split(",")
if item.strip()
)
feature_default_baseline_window_hours = int(os.getenv("FEATURE_BASELINE_WINDOW_HOURS", "24").strip())
anomaly_stale_refresh_threshold_hours = int(os.getenv("ANOMALY_STALE_REFRESH_THRESHOLD_HOURS", "6").strip())
feature_entity_scan_limit = int(os.getenv("FEATURE_ENTITY_SCAN_LIMIT", "200000").strip())
risk_medium_threshold = float(os.getenv("RISK_MEDIUM_THRESHOLD", "0.45").strip())
risk_high_threshold = float(os.getenv("RISK_HIGH_THRESHOLD", "0.75").strip())
risk_anomaly_scan_limit = int(os.getenv("RISK_ANOMALY_SCAN_LIMIT", "5000").strip())
return OneCSettings(
base_url=base_url,
infobase=infobase,
username=username,
password=password,
odata_path=odata_path,
timeout=timeout,
verify_tls=verify_tls,
probe_top=probe_top,
probe_entity_sets=probe_entity_sets,
canonical_db_url=canonical_db_url,
refresh_default_limit_per_set=refresh_default_limit_per_set,
refresh_default_entity_keywords=refresh_default_entity_keywords,
feature_default_baseline_window_hours=feature_default_baseline_window_hours,
anomaly_stale_refresh_threshold_hours=anomaly_stale_refresh_threshold_hours,
feature_entity_scan_limit=feature_entity_scan_limit,
risk_medium_threshold=risk_medium_threshold,
risk_high_threshold=risk_high_threshold,
risk_anomaly_scan_limit=risk_anomaly_scan_limit,
)