NODEDC_1C/tests/test_refresh_store.py

112 lines
3.6 KiB
Python

from __future__ import annotations
from pathlib import Path
from canonical_layer.refresh import RefreshService
from canonical_layer.store import CanonicalStore
from config.settings import OneCSettings
class FakeClient:
def __init__(self, payload_by_set: dict[str, object]) -> None:
self.payload_by_set = payload_by_set
def read_entity_set_records(self, entity_set: str, top: int = 5, extra_params: dict | None = None) -> list[dict]:
payload = self.payload_by_set.get(entity_set, [])
if isinstance(payload, Exception):
raise payload
assert isinstance(payload, list)
return payload[:top]
def fetch_metadata(self) -> str:
return "<edmx:Edmx xmlns:edmx='http://docs.oasis-open.org/odata/ns/edmx'/>"
def _build_settings(db_url: str) -> OneCSettings:
return OneCSettings(
base_url="http://localhost",
infobase="buh_test",
username="",
password="",
odata_path="/odata/standard.odata/",
timeout=30,
verify_tls=False,
probe_top=5,
probe_entity_sets=(),
canonical_db_url=db_url,
refresh_default_limit_per_set=50,
refresh_default_entity_keywords=("document", "posting"),
feature_default_baseline_window_hours=24,
anomaly_stale_refresh_threshold_hours=6,
feature_entity_scan_limit=200000,
risk_medium_threshold=0.45,
risk_high_threshold=0.75,
risk_anomaly_scan_limit=5000,
)
def test_refresh_writes_entities_and_links(tmp_path: Path) -> None:
db_url = f"sqlite:///{(tmp_path / 'canonical.db').as_posix()}"
settings = _build_settings(db_url)
client = FakeClient(
{
"DocumentSales": [
{
"Ref_Key": "11111111-2222-3333-4444-555555555555",
"Description": "Doc 1",
"Counterparty_Key": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
},
{
"Ref_Key": "66666666-7777-8888-9999-000000000000",
"Description": "Doc 2",
},
],
}
)
store = CanonicalStore(settings.canonical_db_url)
service = RefreshService(settings=settings, client=client, store=store)
result = service.run_refresh(
mode="historical",
requested_entity_sets=["DocumentSales"],
limit_per_set=100,
)
assert result.status == "success"
assert result.records_read == 2
assert result.entities_written == 2
assert result.links_written >= 1
stats = service.store_stats()
assert stats["entities_total"] == 2
assert stats["checkpoints_total"] == 1
runs = service.list_recent_runs(limit=5)
assert runs
assert runs[0]["status"] == "success"
def test_refresh_partial_success_when_one_set_fails(tmp_path: Path) -> None:
db_url = f"sqlite:///{(tmp_path / 'canonical_partial.db').as_posix()}"
settings = _build_settings(db_url)
client = FakeClient(
{
"DocumentSales": [
{"Ref_Key": "11111111-2222-3333-4444-555555555555", "Description": "Doc 1"},
],
"BrokenSet": RuntimeError("boom"),
}
)
store = CanonicalStore(settings.canonical_db_url)
service = RefreshService(settings=settings, client=client, store=store)
result = service.run_refresh(
mode="incremental",
requested_entity_sets=["DocumentSales", "BrokenSet"],
limit_per_set=100,
)
assert result.status == "partial_success"
assert result.successful_entity_sets == ["DocumentSales"]
assert result.failed_entity_sets