242 lines
8.2 KiB
Python
242 lines
8.2 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
import json
|
|
from pathlib import Path
|
|
import re
|
|
from typing import Any
|
|
|
|
from config.client import ODataClient, extract_entity_sets
|
|
from config.settings import LOGS_DIR, load_settings
|
|
from canonical_layer.mappers import map_record, map_records
|
|
from canonical_layer.models import CanonicalEntity
|
|
|
|
|
|
ODATA_DATE_RE = re.compile(r"/Date\((?P<millis>-?\d+)")
|
|
|
|
|
|
def _parse_dt(raw: str | None) -> datetime | None:
|
|
if raw is None:
|
|
return None
|
|
text = raw.strip()
|
|
if not text:
|
|
return None
|
|
|
|
match = ODATA_DATE_RE.search(text)
|
|
if match:
|
|
millis = int(match.group("millis"))
|
|
return datetime.fromtimestamp(millis / 1000)
|
|
|
|
for candidate in (text.replace("Z", "+00:00"), text):
|
|
try:
|
|
return datetime.fromisoformat(candidate)
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def _in_date_range(record: dict[str, Any], date_from: datetime | None, date_to: datetime | None) -> bool:
|
|
if date_from is None and date_to is None:
|
|
return True
|
|
|
|
date_candidates = ("Date", "Дата", "Period", "Период", "PostedAt", "posted_at")
|
|
record_dt: datetime | None = None
|
|
for field in date_candidates:
|
|
value = record.get(field)
|
|
if value is None:
|
|
continue
|
|
record_dt = _parse_dt(str(value))
|
|
if record_dt is not None:
|
|
break
|
|
|
|
if record_dt is None:
|
|
return True
|
|
if date_from and record_dt < date_from:
|
|
return False
|
|
if date_to and record_dt > date_to:
|
|
return False
|
|
return True
|
|
|
|
|
|
@dataclass
|
|
class CanonicalService:
|
|
client: ODataClient
|
|
|
|
@classmethod
|
|
def build(cls) -> "CanonicalService":
|
|
settings = load_settings()
|
|
return cls(client=ODataClient(settings))
|
|
|
|
def _load_entity_sets(self) -> list[dict[str, str]]:
|
|
entity_sets_file = LOGS_DIR / "entity_sets.json"
|
|
if entity_sets_file.exists():
|
|
payload = json.loads(entity_sets_file.read_text(encoding="utf-8"))
|
|
entity_sets = payload.get("entity_sets", [])
|
|
if isinstance(entity_sets, list):
|
|
return [item for item in entity_sets if isinstance(item, dict)]
|
|
|
|
metadata_file = LOGS_DIR / "metadata.xml"
|
|
if metadata_file.exists():
|
|
metadata_xml = metadata_file.read_text(encoding="utf-8")
|
|
return extract_entity_sets(metadata_xml)
|
|
|
|
return []
|
|
|
|
def _sets_by_keywords(self, keywords: tuple[str, ...], limit: int = 10) -> list[str]:
|
|
names: list[str] = []
|
|
for item in self._load_entity_sets():
|
|
name = str(item.get("name", ""))
|
|
lowered = name.lower()
|
|
if any(keyword in lowered for keyword in keywords):
|
|
names.append(name)
|
|
return names[:limit]
|
|
|
|
def _safe_read(self, entity_set: str, top: int, extra_params: dict[str, Any] | None = None) -> list[dict[str, Any]]:
|
|
try:
|
|
return self.client.read_entity_set_records(entity_set, top=top, extra_params=extra_params)
|
|
except Exception:
|
|
return []
|
|
|
|
def list_entity_sets(self) -> dict[str, Any]:
|
|
entity_sets = self._load_entity_sets()
|
|
return {
|
|
"total": len(entity_sets),
|
|
"entity_sets": entity_sets,
|
|
}
|
|
|
|
def get_documents(self, date_from: str | None, date_to: str | None, limit: int = 100) -> list[CanonicalEntity]:
|
|
from_dt = _parse_dt(date_from)
|
|
to_dt = _parse_dt(date_to)
|
|
document_sets = self._sets_by_keywords(("документ", "document"), limit=5)
|
|
|
|
all_records: list[CanonicalEntity] = []
|
|
per_set_limit = max(3, limit // max(len(document_sets), 1))
|
|
for entity_set in document_sets:
|
|
records = self._safe_read(entity_set, top=per_set_limit)
|
|
filtered = [row for row in records if _in_date_range(row, from_dt, to_dt)]
|
|
all_records.extend(map_records(entity_set, filtered))
|
|
|
|
return all_records[:limit]
|
|
|
|
def get_document(self, source_id: str) -> CanonicalEntity | None:
|
|
document_sets = self._sets_by_keywords(("документ", "document"), limit=8)
|
|
for entity_set in document_sets:
|
|
records = self._safe_read(entity_set, top=50)
|
|
for row in records:
|
|
for key in ("Ref_Key", "ID", "Id", "id"):
|
|
if str(row.get(key, "")).strip() == source_id:
|
|
return map_record(entity_set, row)
|
|
return None
|
|
|
|
def get_postings(
|
|
self,
|
|
account: str | None,
|
|
date_from: str | None,
|
|
date_to: str | None,
|
|
limit: int = 100,
|
|
) -> list[CanonicalEntity]:
|
|
from_dt = _parse_dt(date_from)
|
|
to_dt = _parse_dt(date_to)
|
|
posting_sets = self._sets_by_keywords(("провод", "posting", "хозрасчет", "регистр"), limit=6)
|
|
|
|
output: list[CanonicalEntity] = []
|
|
per_set_limit = max(3, limit // max(len(posting_sets), 1))
|
|
for entity_set in posting_sets:
|
|
records = self._safe_read(entity_set, top=per_set_limit)
|
|
for row in records:
|
|
if account:
|
|
row_string = json.dumps(row, ensure_ascii=False).lower()
|
|
if account.lower() not in row_string:
|
|
continue
|
|
if not _in_date_range(row, from_dt, to_dt):
|
|
continue
|
|
output.append(map_record(entity_set, row))
|
|
return output[:limit]
|
|
|
|
def get_counterparty_documents(self, counterparty_id: str, limit: int = 100) -> list[CanonicalEntity]:
|
|
documents = self.get_documents(date_from=None, date_to=None, limit=limit * 2)
|
|
result: list[CanonicalEntity] = []
|
|
|
|
for doc in documents:
|
|
if doc.source_id == counterparty_id:
|
|
continue
|
|
serialized = json.dumps(doc.attributes, ensure_ascii=False)
|
|
if counterparty_id in serialized:
|
|
result.append(doc)
|
|
continue
|
|
for link in doc.links:
|
|
if link.target_id == counterparty_id:
|
|
result.append(doc)
|
|
break
|
|
|
|
return result[:limit]
|
|
|
|
def build_document_graph(self, document_id: str) -> dict[str, Any]:
|
|
root = self.get_document(document_id)
|
|
if root is None:
|
|
return {
|
|
"document_id": document_id,
|
|
"found": False,
|
|
"nodes": [],
|
|
"edges": [],
|
|
}
|
|
|
|
nodes: dict[str, dict[str, Any]] = {
|
|
root.source_id: {
|
|
"id": root.source_id,
|
|
"entity": root.source_entity,
|
|
"display_name": root.display_name,
|
|
}
|
|
}
|
|
edges: list[dict[str, Any]] = []
|
|
|
|
for link in root.links:
|
|
node_id = link.target_id
|
|
nodes.setdefault(
|
|
node_id,
|
|
{
|
|
"id": node_id,
|
|
"entity": link.target_entity,
|
|
"display_name": node_id,
|
|
},
|
|
)
|
|
edges.append(
|
|
{
|
|
"from": root.source_id,
|
|
"to": node_id,
|
|
"relation": link.relation,
|
|
"field": link.source_field,
|
|
}
|
|
)
|
|
|
|
postings = self.get_postings(account=None, date_from=None, date_to=None, limit=50)
|
|
for posting in postings:
|
|
serialized = json.dumps(posting.attributes, ensure_ascii=False)
|
|
if document_id not in serialized:
|
|
continue
|
|
nodes.setdefault(
|
|
posting.source_id,
|
|
{
|
|
"id": posting.source_id,
|
|
"entity": posting.source_entity,
|
|
"display_name": posting.display_name,
|
|
},
|
|
)
|
|
edges.append(
|
|
{
|
|
"from": root.source_id,
|
|
"to": posting.source_id,
|
|
"relation": "document_to_posting",
|
|
"field": "inferred",
|
|
}
|
|
)
|
|
|
|
return {
|
|
"document_id": document_id,
|
|
"found": True,
|
|
"nodes": list(nodes.values()),
|
|
"edges": edges,
|
|
}
|
|
|