from __future__ import annotations from collections import defaultdict from decimal import Decimal, InvalidOperation import json from pathlib import Path import re from typing import Any import sys import xml.etree.ElementTree as ET PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from config.client import ODataClient, utc_now_iso from config.settings import LOGS_DIR, load_settings POSTING_ENTITY_SET = "AccountingRegister_Хозрасчетный_RecordType" POSTING_FIELDS = [ "Recorder", "Recorder_Type", "LineNumber", "Period", "Организация_Key", "AccountDr_Key", "AccountCr_Key", "Сумма", ] def _extract_rows(payload: dict[str, Any]) -> list[dict[str, Any]]: rows = payload.get("value") if rows is None and isinstance(payload.get("d"), dict): rows = payload["d"].get("results") if rows is None: return [] if isinstance(rows, list): return rows return [rows] def _safe_read( client: ODataClient, entity_set: str, *, select_fields: list[str], top: int = 200, ) -> list[dict[str, Any]]: params: dict[str, Any] = {"$select": ",".join(select_fields)} try: response = client.read_entity_set(entity_set, top=top, extra_params=params) return _extract_rows(response.payload) except Exception as exc: print(f"[warn] read failed for {entity_set}: {exc.__class__.__name__}") return [] def _to_decimal(value: Any) -> Decimal: if value is None: return Decimal("0") if isinstance(value, Decimal): return value if isinstance(value, (int, float)): return Decimal(str(value)) raw = str(value).strip().replace(",", ".") if not raw: return Decimal("0") try: return Decimal(raw) except InvalidOperation: return Decimal("0") def _to_line_key(value: Any) -> str: if value is None: return "" return str(value) def _parse_metadata_candidates(metadata_path: Path) -> list[dict[str, Any]]: root = ET.fromstring(metadata_path.read_text(encoding="utf-8")) entity_type_props: dict[str, list[str]] = {} for node in root.iter(): if not node.tag.endswith("EntityType"): continue name = node.attrib.get("Name", "") if not name: continue props = [ child.attrib.get("Name", "") for child in node if child.tag.endswith("Property") and child.attrib.get("Name") ] entity_type_props[name] = props candidates: list[dict[str, Any]] = [] for node in root.iter(): if not node.tag.endswith("EntitySet"): continue set_name = node.attrib.get("Name", "") entity_type_full = node.attrib.get("EntityType", "") if not set_name or not entity_type_full: continue entity_type_name = entity_type_full.split(".")[-1] props = entity_type_props.get(entity_type_name, []) if not set_name.startswith("Document_"): continue if "Ref_Key" not in props or "LineNumber" not in props: continue subconto_fields = [ p for p in props if ("Субконто" in p or "Subconto" in p) and not p.endswith("_Type") ] if not subconto_fields: continue type_fields = [ p for p in props if ("Субконто" in p or "Subconto" in p) and p.endswith("_Type") ] candidates.append( { "entity_set": set_name, "subconto_fields": subconto_fields, "subconto_type_fields": type_fields, "available_props": props, } ) return sorted(candidates, key=lambda item: item["entity_set"].lower()) def _derive_recorder_type(entity_set: str) -> str: if "_" not in entity_set: return f"StandardODATA.{entity_set}" base_doc = entity_set.rsplit("_", 1)[0] return f"StandardODATA.{base_doc}" def _derive_select_fields( subconto_fields: list[str], subconto_type_fields: list[str], available_props: list[str], ) -> list[str]: allowed = set(available_props) common = {"Ref_Key", "LineNumber"} relation_hints = [ "Контрагент_Key", "ДоговорКонтрагента_Key", "Номенклатура_Key", "Контрагент", "ДоговорКонтрагента", "Номенклатура", ] for name in relation_hints: if name in allowed: common.add(name) for name in subconto_fields: common.add(name) type_name = f"{name}_Type" if type_name in allowed: common.add(type_name) for name in subconto_type_fields: common.add(name) return sorted(common) def _categorize_type(raw: Any) -> str | None: value = str(raw or "") lowered = value.lower() if ( "договор" in lowered or "contract" in lowered or "äîãîâîð" in lowered ): return "contract" if ( "контрагент" in lowered or "counterparty" in lowered or "êîíòðàãåíò" in lowered ): return "counterparty" if ( "номенклатур" in lowered or "item" in lowered or "nomencl" in lowered or "íîìåíêëàòóð" in lowered ): return "item" return None def _slot_id(field_name: str) -> str: match = re.search(r"(\d+)(?:_Type)?$", field_name) if match: return match.group(1) return "1" def _build_posting_indexes(posting_rows: list[dict[str, Any]]) -> tuple[dict[tuple[str, str, str], dict[str, Any]], dict[str, list[dict[str, Any]]]]: by_triple: dict[tuple[str, str, str], dict[str, Any]] = {} by_account: dict[str, list[dict[str, Any]]] = defaultdict(list) for row in posting_rows: recorder = row.get("Recorder") recorder_type = row.get("Recorder_Type") line_key = _to_line_key(row.get("LineNumber")) if isinstance(recorder, str) and isinstance(recorder_type, str) and line_key: by_triple[(recorder, recorder_type, line_key)] = row dr = row.get("AccountDr_Key") cr = row.get("AccountCr_Key") if isinstance(dr, str) and dr: by_account[dr].append(row) if isinstance(cr, str) and cr: by_account[cr].append(row) return by_triple, by_account def _load_slot3_recon_summary() -> dict[str, Any]: report_path = LOGS_DIR / "slot3_recon_report.json" if not report_path.exists(): return { "report_found": False, "rows_with_non_null_slot3_total": 0, "rows_with_joined_slot3_total": 0, } try: payload = json.loads(report_path.read_text(encoding="utf-8")) except Exception: return { "report_found": False, "rows_with_non_null_slot3_total": 0, "rows_with_joined_slot3_total": 0, } totals = payload.get("totals", {}) return { "report_found": True, "rows_with_non_null_slot3_total": int(totals.get("rows_with_non_null_slot3_total", 0) or 0), "rows_with_joined_slot3_total": int(totals.get("rows_with_joined_slot3_total", 0) or 0), } def main() -> int: settings = load_settings() client = ODataClient(settings) metadata_path = LOGS_DIR / "metadata.xml" if not metadata_path.exists(): print("[error] metadata.xml not found. Run probe first.") return 1 posting_rows = _safe_read( client, POSTING_ENTITY_SET, select_fields=POSTING_FIELDS, top=8000, ) if not posting_rows: print("[error] no posting rows fetched.") return 1 posting_by_triple, posting_by_account = _build_posting_indexes(posting_rows) candidates = _parse_metadata_candidates(metadata_path) joined_evidence: list[dict[str, Any]] = [] dimensions_found: set[str] = set() slots_found: set[str] = set() scanned_sets = 0 for candidate in candidates: entity_set = candidate["entity_set"] recorder_type = _derive_recorder_type(entity_set) select_fields = _derive_select_fields( candidate["subconto_fields"], candidate["subconto_type_fields"], candidate["available_props"], ) line_rows = _safe_read(client, entity_set, select_fields=select_fields, top=600) if not line_rows: continue scanned_sets += 1 for line in line_rows: doc_key = line.get("Ref_Key") if not isinstance(doc_key, str) or not doc_key: continue line_key = _to_line_key(line.get("LineNumber")) if not line_key: continue posting = posting_by_triple.get((doc_key, recorder_type, line_key)) if not posting: continue record = { "entity_set": entity_set, "document_key": doc_key, "line_number": line_key, "recorder_type": recorder_type, "account_dr_key": posting.get("AccountDr_Key"), "account_cr_key": posting.get("AccountCr_Key"), "subconto": [], } for field_name in candidate["subconto_fields"]: value = line.get(field_name) if value in (None, ""): continue inferred_type = line.get(f"{field_name}_Type") if inferred_type in (None, ""): for type_field in candidate["subconto_type_fields"]: if type_field.startswith(field_name): inferred_type = line.get(type_field) if inferred_type not in (None, ""): break category = _categorize_type(inferred_type) slot = _slot_id(field_name) slots_found.add(slot) if category: dimensions_found.add(category) record["subconto"].append( { "slot": slot, "field": field_name, "value": value, "type": inferred_type, "category": category, } ) if record["subconto"]: joined_evidence.append(record) if {"counterparty", "contract", "item"}.issubset(dimensions_found) and {"1", "2", "3"}.issubset(slots_found): break if {"counterparty", "contract", "item"}.issubset(dimensions_found) and {"1", "2", "3"}.issubset(slots_found): break # Check 1: document -> posting -> debit/credit account check1_pass = any( isinstance(item.get("account_dr_key"), str) and item.get("account_dr_key") and isinstance(item.get("account_cr_key"), str) and item.get("account_cr_key") for item in joined_evidence ) check1_sample = joined_evidence[0] if joined_evidence else None # Check 2: posting -> subconto[1..3] -> counterparty/contract/item required_dimensions = {"counterparty", "contract", "item"} required_slots = {"1", "2", "3"} slot3_recon_summary = _load_slot3_recon_summary() if slot3_recon_summary["rows_with_joined_slot3_total"] > 0: slots_found.add("3") check2_pass = required_dimensions.issubset(dimensions_found) and required_slots.issubset(slots_found) # Check 3: explain one real saldo via movements account_stats: dict[str, dict[str, Any]] = {} for account_key, rows in posting_by_account.items(): saldo = Decimal("0") debit_turnover = Decimal("0") credit_turnover = Decimal("0") for row in rows: amount = _to_decimal(row.get("Сумма")) if row.get("AccountDr_Key") == account_key: debit_turnover += amount saldo += amount if row.get("AccountCr_Key") == account_key: credit_turnover += amount saldo -= amount account_stats[account_key] = { "movement_count": len(rows), "debit_turnover": debit_turnover, "credit_turnover": credit_turnover, "saldo": saldo, } chosen_account = None chosen_account_stat: dict[str, Any] | None = None best_score = Decimal("0") for account_key, stat in account_stats.items(): if stat["movement_count"] < 3: continue score = abs(stat["saldo"]) if score > best_score: best_score = score chosen_account = account_key chosen_account_stat = stat saldo_sample: list[dict[str, Any]] = [] if chosen_account: rows = posting_by_account.get(chosen_account, []) for row in rows[:25]: amount = _to_decimal(row.get("Сумма")) sign = Decimal("0") if row.get("AccountDr_Key") == chosen_account: sign += amount if row.get("AccountCr_Key") == chosen_account: sign -= amount saldo_sample.append( { "period": row.get("Period"), "recorder": row.get("Recorder"), "recorder_type": row.get("Recorder_Type"), "line_number": row.get("LineNumber"), "amount": str(amount), "account_dr_key": row.get("AccountDr_Key"), "account_cr_key": row.get("AccountCr_Key"), "delta_to_saldo": str(sign), } ) check3_pass = bool(chosen_account and chosen_account_stat and saldo_sample) report = { "generated_at": utc_now_iso(), "endpoint": settings.service_root, "checks": { "document_to_posting_to_accounts": { "status": "pass" if check1_pass else "fail", "evidence_sample": check1_sample, "joined_rows_found": len(joined_evidence), "line_sets_scanned": scanned_sets, }, "posting_to_subconto123_to_counterparty_contract_item": { "status": "pass" if check2_pass else "fail", "required_dimensions": sorted(required_dimensions), "found_dimensions": sorted(dimensions_found), "required_slots": sorted(required_slots), "found_slots": sorted(slots_found), "slot3_recon_summary": slot3_recon_summary, "evidence_sample": joined_evidence[:10], }, "saldo_explainability_from_movements": { "status": "pass" if check3_pass else "fail", "account_key": chosen_account, "movement_count": chosen_account_stat["movement_count"] if chosen_account_stat else 0, "debit_turnover": str(chosen_account_stat["debit_turnover"]) if chosen_account_stat else "0", "credit_turnover": str(chosen_account_stat["credit_turnover"]) if chosen_account_stat else "0", "saldo": str(chosen_account_stat["saldo"]) if chosen_account_stat else "0", "movement_sample": saldo_sample, }, }, } all_pass = check1_pass and check2_pass and check3_pass report["final_verdict"] = ( "OData sufficient for MVP accounting ontology" if all_pass else "Not yet sufficient for MVP accounting ontology; deeper access is justified for failed checks." ) output_path = LOGS_DIR / "deep_accounting_mvp_gate.json" output_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") print(f"[ok] saved: {output_path}") print( "[ok] checks: " f"doc->posting->accounts={'pass' if check1_pass else 'fail'}, " f"posting->subconto123={'pass' if check2_pass else 'fail'}, " f"saldo_explainability={'pass' if check3_pass else 'fail'}" ) print(f"[ok] verdict: {report['final_verdict']}") return 0 if all_pass else 2 if __name__ == "__main__": raise SystemExit(main())