from __future__ import annotations import json from pathlib import Path import re from typing import Any import sys import xml.etree.ElementTree as ET PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from config.client import ODataClient, utc_now_iso from config.settings import LOGS_DIR, load_settings POSTING_ENTITY_SET = "AccountingRegister_Хозрасчетный_RecordType" POSTING_FIELDS = ["Recorder", "Recorder_Type", "LineNumber", "AccountDr_Key", "AccountCr_Key"] def _extract_rows(payload: dict[str, Any]) -> list[dict[str, Any]]: rows = payload.get("value") if rows is None and isinstance(payload.get("d"), dict): rows = payload["d"].get("results") if rows is None: return [] if isinstance(rows, list): return rows return [rows] def _safe_read( client: ODataClient, entity_set: str, *, select_fields: list[str], extra_params: dict[str, Any] | None = None, warn_on_error: bool = True, top: int = 200, ) -> list[dict[str, Any]]: params: dict[str, Any] = {"$select": ",".join(select_fields)} if extra_params: params.update(extra_params) try: response = client.read_entity_set(entity_set, top=top, extra_params=params) return _extract_rows(response.payload) except Exception as exc: if warn_on_error: print(f"[warn] read failed for {entity_set}: {exc.__class__.__name__}") return [] def _to_line_key(value: Any) -> str: if value is None: return "" return str(value) def _derive_recorder_type(entity_set: str) -> str: if "_" not in entity_set: return f"StandardODATA.{entity_set}" base_doc = entity_set.rsplit("_", 1)[0] return f"StandardODATA.{base_doc}" def _parse_slot3_sets(metadata_path: Path) -> list[dict[str, Any]]: root = ET.fromstring(metadata_path.read_text(encoding="utf-8")) entity_type_props: dict[str, list[str]] = {} for node in root.iter(): if not node.tag.endswith("EntityType"): continue name = node.attrib.get("Name", "") if not name: continue props = [ child.attrib.get("Name", "") for child in node if child.tag.endswith("Property") and child.attrib.get("Name") ] entity_type_props[name] = props results: list[dict[str, Any]] = [] for node in root.iter(): if not node.tag.endswith("EntitySet"): continue set_name = node.attrib.get("Name", "") full_type = node.attrib.get("EntityType", "") if not set_name or not full_type: continue if not set_name.startswith("Document_"): continue type_name = full_type.split(".")[-1] props = entity_type_props.get(type_name, []) if "Ref_Key" not in props or "LineNumber" not in props: continue slot3_fields = [] for prop in props: lowered = prop.lower() if "субконто" not in lowered and "subconto" not in lowered: continue if re.search(r"3(_type)?$", prop): slot3_fields.append(prop) if slot3_fields: results.append( { "entity_set": set_name, "entity_type": type_name, "all_props": props, "slot3_fields": sorted(slot3_fields), "recorder_type": _derive_recorder_type(set_name), } ) results.sort(key=lambda x: x["entity_set"].lower()) return results def main() -> int: settings = load_settings() client = ODataClient(settings) metadata_path = LOGS_DIR / "metadata.xml" if not metadata_path.exists(): print("[error] metadata.xml not found. Run probe first.") return 1 posting_rows = _safe_read( client, POSTING_ENTITY_SET, select_fields=POSTING_FIELDS, top=20000, ) posting_index: dict[tuple[str, str, str], dict[str, Any]] = {} for row in posting_rows: recorder = row.get("Recorder") recorder_type = row.get("Recorder_Type") line = _to_line_key(row.get("LineNumber")) if isinstance(recorder, str) and isinstance(recorder_type, str) and line: posting_index[(recorder, recorder_type, line)] = row slot3_sets = _parse_slot3_sets(metadata_path) per_set_reports: list[dict[str, Any]] = [] totals = { "sets_with_slot3_fields": len(slot3_sets), "sets_with_data_rows": 0, "sets_with_non_null_slot3": 0, "sets_with_joined_slot3_rows": 0, "rows_with_non_null_slot3_total": 0, "rows_with_joined_slot3_total": 0, } for item in slot3_sets: entity_set = item["entity_set"] recorder_type = item["recorder_type"] slot3_fields = item["slot3_fields"] select_fields = ["Ref_Key", "LineNumber"] + slot3_fields baseline_rows = _safe_read( client, entity_set, select_fields=select_fields, top=5000, ) filtered_by_field: dict[str, list[dict[str, Any]]] = {} for field in slot3_fields: filtered_rows = _safe_read( client, entity_set, select_fields=select_fields, extra_params={"$filter": f"{field} ne null"}, warn_on_error=False, top=5000, ) filtered_by_field[field] = filtered_rows non_null_rows = 0 joined_rows = 0 per_field_non_null: dict[str, int] = {f: 0 for f in slot3_fields} samples: list[dict[str, Any]] = [] any_filtered_rows = any(filtered_by_field.values()) if baseline_rows or any_filtered_rows: totals["sets_with_data_rows"] += 1 candidates: dict[tuple[str, str], dict[str, Any]] = {} for field, field_rows in filtered_by_field.items(): for row in field_rows: doc_key = row.get("Ref_Key") line_no = _to_line_key(row.get("LineNumber")) if not isinstance(doc_key, str) or not line_no: continue key = (doc_key, line_no) if key not in candidates: candidates[key] = row rows_to_scan = list(candidates.values()) if candidates else baseline_rows for row in rows_to_scan: has_slot3_value = False row_slot_values: dict[str, Any] = {} for field in slot3_fields: value = row.get(field) if value not in (None, ""): per_field_non_null[field] += 1 row_slot_values[field] = value has_slot3_value = True if not has_slot3_value: continue non_null_rows += 1 doc_key = row.get("Ref_Key") line_no = _to_line_key(row.get("LineNumber")) posting = None if isinstance(doc_key, str) and line_no: posting = posting_index.get((doc_key, recorder_type, line_no)) if posting: joined_rows += 1 if len(samples) < 5: samples.append( { "document_key": doc_key, "line_number": line_no, "recorder_type": recorder_type, "slot3_values": row_slot_values, "account_dr_key": posting.get("AccountDr_Key"), "account_cr_key": posting.get("AccountCr_Key"), } ) if non_null_rows > 0: totals["sets_with_non_null_slot3"] += 1 if joined_rows > 0: totals["sets_with_joined_slot3_rows"] += 1 totals["rows_with_non_null_slot3_total"] += non_null_rows totals["rows_with_joined_slot3_total"] += joined_rows per_set_reports.append( { "entity_set": entity_set, "recorder_type": recorder_type, "rows_fetched_baseline": len(baseline_rows), "rows_fetched_by_filter": {k: len(v) for k, v in filtered_by_field.items()}, "slot3_fields": slot3_fields, "slot3_field_non_null_counts": per_field_non_null, "non_null_slot3_rows": non_null_rows, "joined_slot3_rows": joined_rows, "join_rate": round(joined_rows / non_null_rows, 4) if non_null_rows else 0.0, "samples": samples, } ) per_set_reports.sort( key=lambda x: ( x["joined_slot3_rows"] == 0, -x["joined_slot3_rows"], -x["non_null_slot3_rows"], x["entity_set"].lower(), ) ) report = { "generated_at": utc_now_iso(), "endpoint": settings.service_root, "totals": totals, "slot3_recon": per_set_reports, } output_path = LOGS_DIR / "slot3_recon_report.json" output_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") print(f"[ok] saved: {output_path}") print( "[ok] slot3 summary: " f"sets={totals['sets_with_slot3_fields']}, " f"sets_with_non_null={totals['sets_with_non_null_slot3']}, " f"sets_with_joined={totals['sets_with_joined_slot3_rows']}, " f"rows_non_null={totals['rows_with_non_null_slot3_total']}, " f"rows_joined={totals['rows_with_joined_slot3_total']}" ) return 0 if __name__ == "__main__": raise SystemExit(main())