from __future__ import annotations from pathlib import Path from canonical_layer.refresh import RefreshService from canonical_layer.store import CanonicalStore from config.settings import OneCSettings class FakeClient: def __init__(self, payload_by_set: dict[str, object]) -> None: self.payload_by_set = payload_by_set def read_entity_set_records(self, entity_set: str, top: int = 5, extra_params: dict | None = None) -> list[dict]: payload = self.payload_by_set.get(entity_set, []) if isinstance(payload, Exception): raise payload assert isinstance(payload, list) return payload[:top] def fetch_metadata(self) -> str: return "" def _build_settings(db_url: str) -> OneCSettings: return OneCSettings( base_url="http://localhost", infobase="buh_test", username="", password="", odata_path="/odata/standard.odata/", timeout=30, verify_tls=False, probe_top=5, probe_entity_sets=(), canonical_db_url=db_url, refresh_default_limit_per_set=50, refresh_default_entity_keywords=("document", "posting"), feature_default_baseline_window_hours=24, anomaly_stale_refresh_threshold_hours=6, feature_entity_scan_limit=200000, risk_medium_threshold=0.45, risk_high_threshold=0.75, risk_anomaly_scan_limit=5000, ) def test_refresh_writes_entities_and_links(tmp_path: Path) -> None: db_url = f"sqlite:///{(tmp_path / 'canonical.db').as_posix()}" settings = _build_settings(db_url) client = FakeClient( { "DocumentSales": [ { "Ref_Key": "11111111-2222-3333-4444-555555555555", "Description": "Doc 1", "Counterparty_Key": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", }, { "Ref_Key": "66666666-7777-8888-9999-000000000000", "Description": "Doc 2", }, ], } ) store = CanonicalStore(settings.canonical_db_url) service = RefreshService(settings=settings, client=client, store=store) result = service.run_refresh( mode="historical", requested_entity_sets=["DocumentSales"], limit_per_set=100, ) assert result.status == "success" assert result.records_read == 2 assert result.entities_written == 2 assert result.links_written >= 1 stats = service.store_stats() assert stats["entities_total"] == 2 assert stats["checkpoints_total"] == 1 runs = service.list_recent_runs(limit=5) assert runs assert runs[0]["status"] == "success" def test_refresh_partial_success_when_one_set_fails(tmp_path: Path) -> None: db_url = f"sqlite:///{(tmp_path / 'canonical_partial.db').as_posix()}" settings = _build_settings(db_url) client = FakeClient( { "DocumentSales": [ {"Ref_Key": "11111111-2222-3333-4444-555555555555", "Description": "Doc 1"}, ], "BrokenSet": RuntimeError("boom"), } ) store = CanonicalStore(settings.canonical_db_url) service = RefreshService(settings=settings, client=client, store=store) result = service.run_refresh( mode="incremental", requested_entity_sets=["DocumentSales", "BrokenSet"], limit_per_set=100, ) assert result.status == "partial_success" assert result.successful_entity_sets == ["DocumentSales"] assert result.failed_entity_sets