from __future__ import annotations import json from pathlib import Path from typing import Any import xml.etree.ElementTree as ET import sys PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from config.client import ODataClient, utc_now_iso from config.settings import LOGS_DIR, load_settings def _tag_ends(tag: str, suffix: str) -> bool: return tag.endswith(suffix) def _parse_metadata(metadata_path: Path) -> tuple[dict[str, list[str]], dict[str, str]]: root = ET.fromstring(metadata_path.read_text(encoding="utf-8")) entity_type_props: dict[str, list[str]] = {} for entity_type in root.iter(): if not _tag_ends(entity_type.tag, "EntityType"): continue name = entity_type.attrib.get("Name", "") if not name: continue props: list[str] = [] for child in entity_type: if _tag_ends(child.tag, "Property"): prop_name = child.attrib.get("Name", "") if prop_name: props.append(prop_name) entity_type_props[name] = props entity_set_to_type: dict[str, str] = {} for entity_set in root.iter(): if not _tag_ends(entity_set.tag, "EntitySet"): continue set_name = entity_set.attrib.get("Name", "") et = entity_set.attrib.get("EntityType", "") if not set_name or not et: continue entity_type_name = et.split(".")[-1] entity_set_to_type[set_name] = entity_type_name return entity_type_props, entity_set_to_type def _entity_sets_with_subconto( entity_type_props: dict[str, list[str]], entity_set_to_type: dict[str, str], ) -> list[dict[str, Any]]: results: list[dict[str, Any]] = [] for entity_set, entity_type in entity_set_to_type.items(): props = entity_type_props.get(entity_type, []) subconto_props = [p for p in props if "Субконто" in p or "Subconto" in p] if not subconto_props: continue account_props = [p for p in props if p.startswith("Account") or "Счет" in p] results.append( { "entity_set": entity_set, "entity_type": entity_type, "subconto_properties": subconto_props, "account_properties": account_props, "has_account_and_subconto": bool(account_props), } ) results.sort(key=lambda x: x["entity_set"].lower()) return results def _safe_read_selected( client: ODataClient, entity_set: str, select_fields: list[str] | None = None, top: int = 20, ) -> dict[str, Any]: params: dict[str, Any] = {} if select_fields: params["$select"] = ",".join(select_fields) try: response = client.read_entity_set(entity_set, top=top, extra_params=params or None) payload = response.payload rows = payload.get("value") if rows is None and isinstance(payload.get("d"), dict): rows = payload["d"].get("results") if rows is None: rows = [] if not isinstance(rows, list): rows = [rows] return {"status": "ok", "rows": rows} except Exception as exc: return {"status": "error", "error": str(exc), "rows": []} def _non_null_subconto_counts(rows: list[dict[str, Any]]) -> dict[str, int]: counts: dict[str, int] = {} for row in rows: for key, value in row.items(): if "Субконто" not in key and "Subconto" not in key: continue if value is None or value == "": continue counts[key] = counts.get(key, 0) + 1 return counts def _load_join_probe_override(logs_dir: Path) -> dict[str, Any]: path = logs_dir / "deep_subconto_join_probe.json" if not path.exists(): return {"available": False} try: payload = json.loads(path.read_text(encoding="utf-8-sig")) except Exception as exc: return {"available": False, "error": str(exc)} chain_a = payload.get("chain_A_status") chain_f = payload.get("chain_F_status") valid = {"derivable", "opaque"} return { "available": True, "path": str(path), "chain_A_status": chain_a if chain_a in valid else None, "chain_F_status": chain_f if chain_f in valid else None, } def main() -> int: settings = load_settings() client = ODataClient(settings) metadata_path = LOGS_DIR / "metadata.xml" if not metadata_path.exists(): print("[error] metadata.xml not found. Run fetch_metadata first.") return 1 entity_type_props, entity_set_to_type = _parse_metadata(metadata_path) subconto_sets = _entity_sets_with_subconto(entity_type_props, entity_set_to_type) # Targeted probes for A/F chains target_sets = [ "AccountingRegister_Хозрасчетный_RecordType", "AccountingRegister_Хозрасчетный", "ChartOfAccounts_Хозрасчетный", "ChartOfCharacteristicTypes_ВидыСубконтоХозрасчетные", "Document_ОперацияБух", "Document_ОперацияБух_ТаблицаРегистровБухгалтерии", "Document_РеализацияТоваровУслуг", "Document_ПоступлениеТоваровУслуг", ] target_results: list[dict[str, Any]] = [] for entity_set in target_sets: if entity_set not in entity_set_to_type: target_results.append( { "entity_set": entity_set, "status": "missing_in_metadata", } ) continue et = entity_set_to_type[entity_set] props = entity_type_props.get(et, []) key_fields = [ p for p in props if p in {"Ref_Key", "Recorder", "Recorder_Type", "AccountDr_Key", "AccountCr_Key", "Организация_Key"} or "Субконто" in p ] probe = _safe_read_selected(client, entity_set, select_fields=key_fields or None, top=30) rows = probe.get("rows", []) target_results.append( { "entity_set": entity_set, "entity_type": et, "status": probe.get("status"), "error": probe.get("error"), "rows_fetched": len(rows), "selected_fields": key_fields, "non_null_subconto_counts": _non_null_subconto_counts(rows), "sample_rows": rows[:3], } ) # Chain A/F compact verdict helpers rec_type = next((x for x in target_results if x["entity_set"] == "AccountingRegister_Хозрасчетный_RecordType"), None) chart_accounts = next((x for x in target_results if x["entity_set"] == "ChartOfAccounts_Хозрасчетный"), None) chart_subconto = next((x for x in target_results if x["entity_set"] == "ChartOfCharacteristicTypes_ВидыСубконтоХозрасчетные"), None) op_buh_tbl = next((x for x in target_results if x["entity_set"] == "Document_ОперацияБух_ТаблицаРегистровБухгалтерии"), None) chain_a_status = "opaque" if rec_type and rec_type.get("status") == "ok" and rec_type.get("rows_fetched", 0) > 0: if rec_type.get("non_null_subconto_counts"): chain_a_status = "derivable" elif op_buh_tbl and op_buh_tbl.get("non_null_subconto_counts"): chain_a_status = "derivable" chain_f_status = "opaque" if ( chart_accounts and chart_accounts.get("status") == "ok" and chart_accounts.get("rows_fetched", 0) > 0 and chart_subconto and chart_subconto.get("status") == "ok" and chart_subconto.get("rows_fetched", 0) > 0 and rec_type and rec_type.get("status") == "ok" and rec_type.get("rows_fetched", 0) > 0 ): if rec_type.get("non_null_subconto_counts"): chain_f_status = "derivable" join_probe_override = _load_join_probe_override(LOGS_DIR) if join_probe_override.get("available"): override_a = join_probe_override.get("chain_A_status") override_f = join_probe_override.get("chain_F_status") if override_a == "derivable": chain_a_status = "derivable" if override_f == "derivable": chain_f_status = "derivable" report = { "generated_at": utc_now_iso(), "endpoint": settings.service_root, "entity_sets_with_subconto_total": len(subconto_sets), "entity_sets_with_subconto": subconto_sets, "targeted_results": target_results, "chain_assessment": { "A_document_to_posting_account_subconto": chain_a_status, "F_chart_to_subconto_to_posting": chain_f_status, }, "override_from_join_probe": join_probe_override, } out_path = LOGS_DIR / "deep_subconto_probe.json" out_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") print(f"[ok] saved: {out_path}") print(f"[ok] entity_sets_with_subconto_total={len(subconto_sets)}") print( "[ok] chain A=" + report["chain_assessment"]["A_document_to_posting_account_subconto"] + ", chain F=" + report["chain_assessment"]["F_chart_to_subconto_to_posting"] ) return 0 if __name__ == "__main__": raise SystemExit(main())