NODEDC_1C/canonical_layer/service.py

242 lines
8.2 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
import json
from pathlib import Path
import re
from typing import Any
from config.client import ODataClient, extract_entity_sets
from config.settings import LOGS_DIR, load_settings
from canonical_layer.mappers import map_record, map_records
from canonical_layer.models import CanonicalEntity
ODATA_DATE_RE = re.compile(r"/Date\((?P<millis>-?\d+)")
def _parse_dt(raw: str | None) -> datetime | None:
if raw is None:
return None
text = raw.strip()
if not text:
return None
match = ODATA_DATE_RE.search(text)
if match:
millis = int(match.group("millis"))
return datetime.fromtimestamp(millis / 1000)
for candidate in (text.replace("Z", "+00:00"), text):
try:
return datetime.fromisoformat(candidate)
except ValueError:
continue
return None
def _in_date_range(record: dict[str, Any], date_from: datetime | None, date_to: datetime | None) -> bool:
if date_from is None and date_to is None:
return True
date_candidates = ("Date", "Дата", "Period", "Период", "PostedAt", "posted_at")
record_dt: datetime | None = None
for field in date_candidates:
value = record.get(field)
if value is None:
continue
record_dt = _parse_dt(str(value))
if record_dt is not None:
break
if record_dt is None:
return True
if date_from and record_dt < date_from:
return False
if date_to and record_dt > date_to:
return False
return True
@dataclass
class CanonicalService:
client: ODataClient
@classmethod
def build(cls) -> "CanonicalService":
settings = load_settings()
return cls(client=ODataClient(settings))
def _load_entity_sets(self) -> list[dict[str, str]]:
entity_sets_file = LOGS_DIR / "entity_sets.json"
if entity_sets_file.exists():
payload = json.loads(entity_sets_file.read_text(encoding="utf-8"))
entity_sets = payload.get("entity_sets", [])
if isinstance(entity_sets, list):
return [item for item in entity_sets if isinstance(item, dict)]
metadata_file = LOGS_DIR / "metadata.xml"
if metadata_file.exists():
metadata_xml = metadata_file.read_text(encoding="utf-8")
return extract_entity_sets(metadata_xml)
return []
def _sets_by_keywords(self, keywords: tuple[str, ...], limit: int = 10) -> list[str]:
names: list[str] = []
for item in self._load_entity_sets():
name = str(item.get("name", ""))
lowered = name.lower()
if any(keyword in lowered for keyword in keywords):
names.append(name)
return names[:limit]
def _safe_read(self, entity_set: str, top: int, extra_params: dict[str, Any] | None = None) -> list[dict[str, Any]]:
try:
return self.client.read_entity_set_records(entity_set, top=top, extra_params=extra_params)
except Exception:
return []
def list_entity_sets(self) -> dict[str, Any]:
entity_sets = self._load_entity_sets()
return {
"total": len(entity_sets),
"entity_sets": entity_sets,
}
def get_documents(self, date_from: str | None, date_to: str | None, limit: int = 100) -> list[CanonicalEntity]:
from_dt = _parse_dt(date_from)
to_dt = _parse_dt(date_to)
document_sets = self._sets_by_keywords(("документ", "document"), limit=5)
all_records: list[CanonicalEntity] = []
per_set_limit = max(3, limit // max(len(document_sets), 1))
for entity_set in document_sets:
records = self._safe_read(entity_set, top=per_set_limit)
filtered = [row for row in records if _in_date_range(row, from_dt, to_dt)]
all_records.extend(map_records(entity_set, filtered))
return all_records[:limit]
def get_document(self, source_id: str) -> CanonicalEntity | None:
document_sets = self._sets_by_keywords(("документ", "document"), limit=8)
for entity_set in document_sets:
records = self._safe_read(entity_set, top=50)
for row in records:
for key in ("Ref_Key", "ID", "Id", "id"):
if str(row.get(key, "")).strip() == source_id:
return map_record(entity_set, row)
return None
def get_postings(
self,
account: str | None,
date_from: str | None,
date_to: str | None,
limit: int = 100,
) -> list[CanonicalEntity]:
from_dt = _parse_dt(date_from)
to_dt = _parse_dt(date_to)
posting_sets = self._sets_by_keywords(("провод", "posting", "хозрасчет", "регистр"), limit=6)
output: list[CanonicalEntity] = []
per_set_limit = max(3, limit // max(len(posting_sets), 1))
for entity_set in posting_sets:
records = self._safe_read(entity_set, top=per_set_limit)
for row in records:
if account:
row_string = json.dumps(row, ensure_ascii=False).lower()
if account.lower() not in row_string:
continue
if not _in_date_range(row, from_dt, to_dt):
continue
output.append(map_record(entity_set, row))
return output[:limit]
def get_counterparty_documents(self, counterparty_id: str, limit: int = 100) -> list[CanonicalEntity]:
documents = self.get_documents(date_from=None, date_to=None, limit=limit * 2)
result: list[CanonicalEntity] = []
for doc in documents:
if doc.source_id == counterparty_id:
continue
serialized = json.dumps(doc.attributes, ensure_ascii=False)
if counterparty_id in serialized:
result.append(doc)
continue
for link in doc.links:
if link.target_id == counterparty_id:
result.append(doc)
break
return result[:limit]
def build_document_graph(self, document_id: str) -> dict[str, Any]:
root = self.get_document(document_id)
if root is None:
return {
"document_id": document_id,
"found": False,
"nodes": [],
"edges": [],
}
nodes: dict[str, dict[str, Any]] = {
root.source_id: {
"id": root.source_id,
"entity": root.source_entity,
"display_name": root.display_name,
}
}
edges: list[dict[str, Any]] = []
for link in root.links:
node_id = link.target_id
nodes.setdefault(
node_id,
{
"id": node_id,
"entity": link.target_entity,
"display_name": node_id,
},
)
edges.append(
{
"from": root.source_id,
"to": node_id,
"relation": link.relation,
"field": link.source_field,
}
)
postings = self.get_postings(account=None, date_from=None, date_to=None, limit=50)
for posting in postings:
serialized = json.dumps(posting.attributes, ensure_ascii=False)
if document_id not in serialized:
continue
nodes.setdefault(
posting.source_id,
{
"id": posting.source_id,
"entity": posting.source_entity,
"display_name": posting.display_name,
},
)
edges.append(
{
"from": root.source_id,
"to": posting.source_id,
"relation": "document_to_posting",
"field": "inferred",
}
)
return {
"document_id": document_id,
"found": True,
"nodes": list(nodes.values()),
"edges": edges,
}