from __future__ import annotations import hashlib import json import re from typing import Any from canonical_layer.models import ( Account, BankAccount, CanonicalEntity, CashflowArticle, Contract, Counterparty, Currency, Department, Document, EntityLink, Individual, InvoiceDocument, Item, Organization, Period, Posting, RegisterMovement, RegisterRecord, ResponsiblePerson, Subconto, Warehouse, ) GUID_RE = re.compile( r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$" ) ZERO_GUID = "00000000-0000-0000-0000-000000000000" MISSING_SOURCE_IDS = {"", "unknown", "none", "null", "n/a", "nan"} SOURCE_ID_FIELDS = ("Ref_Key", "Ref", "ID", "Id", "id", "Key") DISPLAY_FIELDS = ( "Description", "Presentation", "Number", "Code", "Наименование", "Представление", ) def _normalize_text(value: Any) -> str: return str(value or "").strip() def _normalize_key(value: str) -> str: lowered = value.strip().lower() return re.sub(r"[^a-zа-я0-9_]+", "", lowered) def _is_guid(value: Any) -> bool: if not isinstance(value, str): return False return bool(GUID_RE.match(value.strip())) def _is_zero_guid(value: str) -> bool: return value.strip().lower() == ZERO_GUID def _pick_first(record: dict[str, Any], field_names: tuple[str, ...], default: str) -> str: for field in field_names: value = record.get(field) normalized = _normalize_text(value) if normalized: return normalized return default def _reference_type_field(record: dict[str, Any], field: str) -> tuple[str | None, str | None]: candidates = ( f"{field}_Type", f"{field}Type", ) for candidate in candidates: if candidate in record: text = _normalize_text(record.get(candidate)) if text: return candidate, text return None, None def _build_composite_source_id(entity_set: str, record: dict[str, Any]) -> str: composite_payload = { "entity_set": entity_set, "Recorder": _normalize_text(record.get("Recorder")), "Recorder_Type": _normalize_text(record.get("Recorder_Type")), "Ref": _normalize_text(record.get("Ref")), "Ref_Type": _normalize_text(record.get("Ref_Type")), "LineNumber": _normalize_text(record.get("LineNumber")), "Period": _normalize_text(record.get("Period")), "Date": _normalize_text(record.get("Date")), } digest = hashlib.sha1( json.dumps(composite_payload, ensure_ascii=False, sort_keys=True).encode("utf-8") ).hexdigest() return f"cmp:{digest}" def _guess_entity_from_type_hint(type_hint: str) -> str | None: text = _normalize_key(type_hint) if not text: return None if "document_" in text or "документ" in text: if "счетфактур" in text or "invoice" in text: return "InvoiceDocument" return "Document" if "catalog_" in text or "справочник" in text: if "контрагент" in text or "counterparty" in text: return "Counterparty" if "договор" in text or "contract" in text: return "Contract" if "валют" in text or "currency" in text: return "Currency" if "склад" in text or "warehouse" in text: return "Warehouse" if "физическоелицо" in text or "физлиц" in text or "individual" in text: return "Individual" if "статьядвиженияденежныхсредств" in text or "cashflow" in text: return "CashflowArticle" if "подраздел" in text or "department" in text: return "Department" if "номенклатур" in text or "item" in text or "product" in text: return "Item" if "пользоват" in text or "сотрудник" in text or "employee" in text or "user" in text: return "ResponsiblePerson" if "банковскиесчета" in text or "bankaccount" in text: return "BankAccount" if "организац" in text or "organization" in text: return "Organization" if "счета" in text or "account" in text: return "Account" if "accumulationregister_" in text or "informationregister_" in text or "accountingregister_" in text: return "RegisterRecord" return None def _is_document_journal(entity_set: str) -> bool: lowered = entity_set.lower() return "documentjournal_" in lowered or "журнал" in lowered def _is_register_entity_set(entity_set: str) -> bool: lowered = entity_set.lower() return ( "register" in lowered or "регистр" in lowered or "accumulation" in lowered or "informationregister_" in lowered or "accountingregister_" in lowered ) def _is_document_entity_set(entity_set: str) -> bool: lowered = entity_set.lower() return "document_" in lowered or "document" in lowered or "документ" in lowered def _field_role(field: str) -> str | None: key = _normalize_key(field) if key in {"recorder", "регистратор"}: return "recorder" if key == "ref": return "ref" if "счетфактур" in key or "invoice" in key: return "invoice" if "поставщик" in key or "supplier" in key: return "supplier" if "покупатель" in key or "buyer" in key or "customer" in key: return "buyer" if "контрагент" in key or "counterparty" in key: return "counterparty" if "договор" in key or "contract" in key: return "contract" if "организац" in key or "organization" in key: return "organization" if "ответствен" in key or "responsible" in key: return "responsible" if "валют" in key or "currency" in key: return "currency" if "склад" in key or "warehouse" in key: return "warehouse" if "статьядвиженияденежныхсредств" in key or "cashflow" in key: return "cashflow_article" if "физлиц" in key or "individual" in key or "person" in key: return "individual" if "подраздел" in key or "department" in key: return "department" if "банковскисчет" in key or "банковскийсчет" in key or "bankaccount" in key: return "bank_account" if "счеторганизац" in key or "organizationaccount" in key: return "bank_account" if "номенклатур" in key or "товар" in key or "item" in key or "product" in key: return "item" if "счет" in key or "account" in key: return "account" return None def _role_target(role: str) -> str: mapping = { "recorder": "Document", "ref": "Document", "invoice": "InvoiceDocument", "supplier": "Counterparty", "buyer": "Counterparty", "counterparty": "Counterparty", "contract": "Contract", "organization": "Organization", "responsible": "ResponsiblePerson", "currency": "Currency", "warehouse": "Warehouse", "cashflow_article": "CashflowArticle", "individual": "Individual", "department": "Department", "bank_account": "BankAccount", "item": "Item", "account": "Account", } return mapping.get(role, "Unknown") def _role_relation(entity_set: str, role: str) -> str: if _is_document_journal(entity_set): if role == "ref": return "journal_refers_to_document" if role == "currency": return "journal_has_currency" return f"journal_{role}" if _is_register_entity_set(entity_set): mapping = { "recorder": "register_recorded_by_document", "invoice": "register_relates_to_invoice", "supplier": "register_relates_to_supplier", "buyer": "register_relates_to_buyer", "counterparty": "register_relates_to_counterparty", "contract": "register_relates_to_contract", "organization": "register_relates_to_organization", "currency": "register_relates_to_currency", "warehouse": "register_relates_to_warehouse", "bank_account": "register_relates_to_bank_account", "item": "register_relates_to_item", "account": "register_relates_to_account", "department": "register_relates_to_department", "individual": "register_relates_to_individual", "cashflow_article": "register_relates_to_cashflow_article", "responsible": "register_has_responsible", } return mapping.get(role, "register_reference") if _is_document_entity_set(entity_set): mapping = { "counterparty": "document_has_counterparty", "contract": "document_has_contract", "organization": "document_belongs_to_organization", "responsible": "document_has_responsible", "currency": "document_has_currency", "warehouse": "document_has_warehouse", "cashflow_article": "document_has_cashflow_article", "bank_account": "document_has_bank_account", "department": "document_has_department", "individual": "document_relates_to_individual", "invoice": "document_relates_to_invoice", "item": "document_line_has_item", "account": "document_line_has_account", "supplier": "document_has_supplier", "buyer": "document_has_buyer", } return mapping.get(role, "document_reference") return "reference" def _guess_target_entity(field: str) -> str: role = _field_role(field) if role is None: return "Unknown" return _role_target(role) def _is_reference_candidate(record: dict[str, Any], field: str, value: Any) -> bool: if field in SOURCE_ID_FIELDS and field != "Ref": return False if field.endswith("@navigationLinkUrl"): return False if field.endswith("_Type"): return False if isinstance(value, (dict, list)): return False normalized = _normalize_text(value) if not normalized: return False if field.endswith("_Key"): return True if field.lower().endswith("ref"): return True if _is_guid(normalized): return True if _field_role(field) is not None: return True type_field, _ = _reference_type_field(record, field) if type_field: return True return False def _resolve_relation_and_target( *, entity_set: str, field: str, value: str, record: dict[str, Any], ) -> tuple[str, str]: role = _field_role(field) type_field, type_hint = _reference_type_field(record, field) target_from_type = _guess_entity_from_type_hint(type_hint or "") if type_field and role in {"recorder", "ref"} and target_from_type == "InvoiceDocument": # Recorder/Ref should still point at document-level nodes. target_from_type = "Document" if role is None: relation = "reference" else: relation = _role_relation(entity_set, role) if target_from_type: target_entity = target_from_type elif role is not None: target_entity = _role_target(role) else: target_entity = _guess_target_entity(field) if target_entity == "Unknown": relation = "reference" if _is_zero_guid(value): return "null_reference", target_entity return relation, target_entity def _extract_links(entity_set: str, record: dict[str, Any]) -> list[EntityLink]: links: list[EntityLink] = [] for field, raw_value in record.items(): if not _is_reference_candidate(record, field, raw_value): continue text_value = _normalize_text(raw_value) if not text_value: continue if _is_zero_guid(text_value): # Keep empty/null references out of canonical graph relations. continue relation, target_entity = _resolve_relation_and_target( entity_set=entity_set, field=field, value=text_value, record=record, ) links.append( EntityLink( relation=relation, target_entity=target_entity, target_id=text_value, source_field=field, ) ) return links def _entity_cls_for_set(entity_set: str) -> type[CanonicalEntity]: lowered = entity_set.lower() if "счетфактур" in lowered or "invoice" in lowered: return InvoiceDocument if "документ" in lowered or "document" in lowered: return Document if "контраг" in lowered or "counterparty" in lowered: return Counterparty if "договор" in lowered or "contract" in lowered: return Contract if "банковск" in lowered and "счет" in lowered: return BankAccount if "валют" in lowered or "currency" in lowered: return Currency if "склад" in lowered or "warehouse" in lowered: return Warehouse if "подраздел" in lowered or "department" in lowered: return Department if "физлиц" in lowered or "individual" in lowered: return Individual if "номенклатур" in lowered or "item" in lowered or "product" in lowered: return Item if "ответствен" in lowered or "пользоват" in lowered or "employee" in lowered: return ResponsiblePerson if "статьядвиженияденежныхсредств" in lowered or "cashflow" in lowered: return CashflowArticle if "счет" in lowered or "account" in lowered: return Account if "субконто" in lowered or "subconto" in lowered: return Subconto if "движ" in lowered or "movement" in lowered: return RegisterMovement if "провод" in lowered or "posting" in lowered: return Posting if "регистр" in lowered or "register" in lowered: return RegisterRecord if "период" in lowered or "period" in lowered: return Period if "организ" in lowered or "organization" in lowered: return Organization return CanonicalEntity def _normalize_source_id(value: Any) -> str: text = _normalize_text(value) if text.lower() in MISSING_SOURCE_IDS: return "" return text def map_record(entity_set: str, record: dict[str, Any]) -> CanonicalEntity: source_id = _normalize_source_id(_pick_first(record, SOURCE_ID_FIELDS, default="")) if not source_id: source_id = _build_composite_source_id(entity_set, record) display_name = _pick_first(record, DISPLAY_FIELDS, default=source_id) canonical_cls = _entity_cls_for_set(entity_set) return canonical_cls( source_entity=entity_set, source_id=source_id, display_name=display_name, attributes=record, links=_extract_links(entity_set, record), ) def map_records(entity_set: str, records: list[dict[str, Any]]) -> list[CanonicalEntity]: return [map_record(entity_set, record) for record in records] def canonical_relation_rule_catalog() -> list[dict[str, str]]: return [ {"context": "register", "role": "recorder", "relation": "register_recorded_by_document"}, {"context": "journal", "role": "ref", "relation": "journal_refers_to_document"}, {"context": "document", "role": "counterparty", "relation": "document_has_counterparty"}, {"context": "document", "role": "contract", "relation": "document_has_contract"}, {"context": "document", "role": "organization", "relation": "document_belongs_to_organization"}, {"context": "document", "role": "responsible", "relation": "document_has_responsible"}, {"context": "document", "role": "currency", "relation": "document_has_currency"}, {"context": "document", "role": "warehouse", "relation": "document_has_warehouse"}, {"context": "document", "role": "cashflow_article", "relation": "document_has_cashflow_article"}, {"context": "document", "role": "bank_account", "relation": "document_has_bank_account"}, {"context": "register", "role": "supplier", "relation": "register_relates_to_supplier"}, {"context": "register", "role": "buyer", "relation": "register_relates_to_buyer"}, {"context": "register", "role": "invoice", "relation": "register_relates_to_invoice"}, {"context": "register", "role": "contract", "relation": "register_relates_to_contract"}, {"context": "register", "role": "organization", "relation": "register_relates_to_organization"}, {"context": "register", "role": "account", "relation": "register_relates_to_account"}, {"context": "register", "role": "item", "relation": "register_relates_to_item"}, ]