from __future__ import annotations from dataclasses import dataclass from datetime import datetime import json from pathlib import Path import re from typing import Any from config.client import ODataClient, extract_entity_sets from config.settings import LOGS_DIR, load_settings from canonical_layer.mappers import map_record, map_records from canonical_layer.models import CanonicalEntity ODATA_DATE_RE = re.compile(r"/Date\((?P-?\d+)") def _parse_dt(raw: str | None) -> datetime | None: if raw is None: return None text = raw.strip() if not text: return None match = ODATA_DATE_RE.search(text) if match: millis = int(match.group("millis")) return datetime.fromtimestamp(millis / 1000) for candidate in (text.replace("Z", "+00:00"), text): try: return datetime.fromisoformat(candidate) except ValueError: continue return None def _in_date_range(record: dict[str, Any], date_from: datetime | None, date_to: datetime | None) -> bool: if date_from is None and date_to is None: return True date_candidates = ("Date", "Дата", "Period", "Период", "PostedAt", "posted_at") record_dt: datetime | None = None for field in date_candidates: value = record.get(field) if value is None: continue record_dt = _parse_dt(str(value)) if record_dt is not None: break if record_dt is None: return True if date_from and record_dt < date_from: return False if date_to and record_dt > date_to: return False return True @dataclass class CanonicalService: client: ODataClient @classmethod def build(cls) -> "CanonicalService": settings = load_settings() return cls(client=ODataClient(settings)) def _load_entity_sets(self) -> list[dict[str, str]]: entity_sets_file = LOGS_DIR / "entity_sets.json" if entity_sets_file.exists(): payload = json.loads(entity_sets_file.read_text(encoding="utf-8")) entity_sets = payload.get("entity_sets", []) if isinstance(entity_sets, list): return [item for item in entity_sets if isinstance(item, dict)] metadata_file = LOGS_DIR / "metadata.xml" if metadata_file.exists(): metadata_xml = metadata_file.read_text(encoding="utf-8") return extract_entity_sets(metadata_xml) return [] def _sets_by_keywords(self, keywords: tuple[str, ...], limit: int = 10) -> list[str]: names: list[str] = [] for item in self._load_entity_sets(): name = str(item.get("name", "")) lowered = name.lower() if any(keyword in lowered for keyword in keywords): names.append(name) return names[:limit] def _safe_read(self, entity_set: str, top: int, extra_params: dict[str, Any] | None = None) -> list[dict[str, Any]]: try: return self.client.read_entity_set_records(entity_set, top=top, extra_params=extra_params) except Exception: return [] def list_entity_sets(self) -> dict[str, Any]: entity_sets = self._load_entity_sets() return { "total": len(entity_sets), "entity_sets": entity_sets, } def get_documents(self, date_from: str | None, date_to: str | None, limit: int = 100) -> list[CanonicalEntity]: from_dt = _parse_dt(date_from) to_dt = _parse_dt(date_to) document_sets = self._sets_by_keywords(("документ", "document"), limit=5) all_records: list[CanonicalEntity] = [] per_set_limit = max(3, limit // max(len(document_sets), 1)) for entity_set in document_sets: records = self._safe_read(entity_set, top=per_set_limit) filtered = [row for row in records if _in_date_range(row, from_dt, to_dt)] all_records.extend(map_records(entity_set, filtered)) return all_records[:limit] def get_document(self, source_id: str) -> CanonicalEntity | None: document_sets = self._sets_by_keywords(("документ", "document"), limit=8) for entity_set in document_sets: records = self._safe_read(entity_set, top=50) for row in records: for key in ("Ref_Key", "ID", "Id", "id"): if str(row.get(key, "")).strip() == source_id: return map_record(entity_set, row) return None def get_postings( self, account: str | None, date_from: str | None, date_to: str | None, limit: int = 100, ) -> list[CanonicalEntity]: from_dt = _parse_dt(date_from) to_dt = _parse_dt(date_to) posting_sets = self._sets_by_keywords(("провод", "posting", "хозрасчет", "регистр"), limit=6) output: list[CanonicalEntity] = [] per_set_limit = max(3, limit // max(len(posting_sets), 1)) for entity_set in posting_sets: records = self._safe_read(entity_set, top=per_set_limit) for row in records: if account: row_string = json.dumps(row, ensure_ascii=False).lower() if account.lower() not in row_string: continue if not _in_date_range(row, from_dt, to_dt): continue output.append(map_record(entity_set, row)) return output[:limit] def get_counterparty_documents(self, counterparty_id: str, limit: int = 100) -> list[CanonicalEntity]: documents = self.get_documents(date_from=None, date_to=None, limit=limit * 2) result: list[CanonicalEntity] = [] for doc in documents: if doc.source_id == counterparty_id: continue serialized = json.dumps(doc.attributes, ensure_ascii=False) if counterparty_id in serialized: result.append(doc) continue for link in doc.links: if link.target_id == counterparty_id: result.append(doc) break return result[:limit] def build_document_graph(self, document_id: str) -> dict[str, Any]: root = self.get_document(document_id) if root is None: return { "document_id": document_id, "found": False, "nodes": [], "edges": [], } nodes: dict[str, dict[str, Any]] = { root.source_id: { "id": root.source_id, "entity": root.source_entity, "display_name": root.display_name, } } edges: list[dict[str, Any]] = [] for link in root.links: node_id = link.target_id nodes.setdefault( node_id, { "id": node_id, "entity": link.target_entity, "display_name": node_id, }, ) edges.append( { "from": root.source_id, "to": node_id, "relation": link.relation, "field": link.source_field, } ) postings = self.get_postings(account=None, date_from=None, date_to=None, limit=50) for posting in postings: serialized = json.dumps(posting.attributes, ensure_ascii=False) if document_id not in serialized: continue nodes.setdefault( posting.source_id, { "id": posting.source_id, "entity": posting.source_entity, "display_name": posting.display_name, }, ) edges.append( { "from": root.source_id, "to": posting.source_id, "relation": "document_to_posting", "field": "inferred", } ) return { "document_id": document_id, "found": True, "nodes": list(nodes.values()), "edges": edges, }