NODEDC_1C/scripts/recon_slot3_gap.py

292 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
from pathlib import Path
import re
from typing import Any
import sys
import xml.etree.ElementTree as ET
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from config.client import ODataClient, utc_now_iso
from config.settings import LOGS_DIR, load_settings
POSTING_ENTITY_SET = "AccountingRegister_Хозрасчетный_RecordType"
POSTING_FIELDS = ["Recorder", "Recorder_Type", "LineNumber", "AccountDr_Key", "AccountCr_Key"]
def _extract_rows(payload: dict[str, Any]) -> list[dict[str, Any]]:
rows = payload.get("value")
if rows is None and isinstance(payload.get("d"), dict):
rows = payload["d"].get("results")
if rows is None:
return []
if isinstance(rows, list):
return rows
return [rows]
def _safe_read(
client: ODataClient,
entity_set: str,
*,
select_fields: list[str],
extra_params: dict[str, Any] | None = None,
warn_on_error: bool = True,
top: int = 200,
) -> list[dict[str, Any]]:
params: dict[str, Any] = {"$select": ",".join(select_fields)}
if extra_params:
params.update(extra_params)
try:
response = client.read_entity_set(entity_set, top=top, extra_params=params)
return _extract_rows(response.payload)
except Exception as exc:
if warn_on_error:
print(f"[warn] read failed for {entity_set}: {exc.__class__.__name__}")
return []
def _to_line_key(value: Any) -> str:
if value is None:
return ""
return str(value)
def _derive_recorder_type(entity_set: str) -> str:
if "_" not in entity_set:
return f"StandardODATA.{entity_set}"
base_doc = entity_set.rsplit("_", 1)[0]
return f"StandardODATA.{base_doc}"
def _parse_slot3_sets(metadata_path: Path) -> list[dict[str, Any]]:
root = ET.fromstring(metadata_path.read_text(encoding="utf-8"))
entity_type_props: dict[str, list[str]] = {}
for node in root.iter():
if not node.tag.endswith("EntityType"):
continue
name = node.attrib.get("Name", "")
if not name:
continue
props = [
child.attrib.get("Name", "")
for child in node
if child.tag.endswith("Property") and child.attrib.get("Name")
]
entity_type_props[name] = props
results: list[dict[str, Any]] = []
for node in root.iter():
if not node.tag.endswith("EntitySet"):
continue
set_name = node.attrib.get("Name", "")
full_type = node.attrib.get("EntityType", "")
if not set_name or not full_type:
continue
if not set_name.startswith("Document_"):
continue
type_name = full_type.split(".")[-1]
props = entity_type_props.get(type_name, [])
if "Ref_Key" not in props or "LineNumber" not in props:
continue
slot3_fields = []
for prop in props:
lowered = prop.lower()
if "субконто" not in lowered and "subconto" not in lowered:
continue
if re.search(r"3(_type)?$", prop):
slot3_fields.append(prop)
if slot3_fields:
results.append(
{
"entity_set": set_name,
"entity_type": type_name,
"all_props": props,
"slot3_fields": sorted(slot3_fields),
"recorder_type": _derive_recorder_type(set_name),
}
)
results.sort(key=lambda x: x["entity_set"].lower())
return results
def main() -> int:
settings = load_settings()
client = ODataClient(settings)
metadata_path = LOGS_DIR / "metadata.xml"
if not metadata_path.exists():
print("[error] metadata.xml not found. Run probe first.")
return 1
posting_rows = _safe_read(
client,
POSTING_ENTITY_SET,
select_fields=POSTING_FIELDS,
top=20000,
)
posting_index: dict[tuple[str, str, str], dict[str, Any]] = {}
for row in posting_rows:
recorder = row.get("Recorder")
recorder_type = row.get("Recorder_Type")
line = _to_line_key(row.get("LineNumber"))
if isinstance(recorder, str) and isinstance(recorder_type, str) and line:
posting_index[(recorder, recorder_type, line)] = row
slot3_sets = _parse_slot3_sets(metadata_path)
per_set_reports: list[dict[str, Any]] = []
totals = {
"sets_with_slot3_fields": len(slot3_sets),
"sets_with_data_rows": 0,
"sets_with_non_null_slot3": 0,
"sets_with_joined_slot3_rows": 0,
"rows_with_non_null_slot3_total": 0,
"rows_with_joined_slot3_total": 0,
}
for item in slot3_sets:
entity_set = item["entity_set"]
recorder_type = item["recorder_type"]
slot3_fields = item["slot3_fields"]
select_fields = ["Ref_Key", "LineNumber"] + slot3_fields
baseline_rows = _safe_read(
client,
entity_set,
select_fields=select_fields,
top=5000,
)
filtered_by_field: dict[str, list[dict[str, Any]]] = {}
for field in slot3_fields:
filtered_rows = _safe_read(
client,
entity_set,
select_fields=select_fields,
extra_params={"$filter": f"{field} ne null"},
warn_on_error=False,
top=5000,
)
filtered_by_field[field] = filtered_rows
non_null_rows = 0
joined_rows = 0
per_field_non_null: dict[str, int] = {f: 0 for f in slot3_fields}
samples: list[dict[str, Any]] = []
any_filtered_rows = any(filtered_by_field.values())
if baseline_rows or any_filtered_rows:
totals["sets_with_data_rows"] += 1
candidates: dict[tuple[str, str], dict[str, Any]] = {}
for field, field_rows in filtered_by_field.items():
for row in field_rows:
doc_key = row.get("Ref_Key")
line_no = _to_line_key(row.get("LineNumber"))
if not isinstance(doc_key, str) or not line_no:
continue
key = (doc_key, line_no)
if key not in candidates:
candidates[key] = row
rows_to_scan = list(candidates.values()) if candidates else baseline_rows
for row in rows_to_scan:
has_slot3_value = False
row_slot_values: dict[str, Any] = {}
for field in slot3_fields:
value = row.get(field)
if value not in (None, ""):
per_field_non_null[field] += 1
row_slot_values[field] = value
has_slot3_value = True
if not has_slot3_value:
continue
non_null_rows += 1
doc_key = row.get("Ref_Key")
line_no = _to_line_key(row.get("LineNumber"))
posting = None
if isinstance(doc_key, str) and line_no:
posting = posting_index.get((doc_key, recorder_type, line_no))
if posting:
joined_rows += 1
if len(samples) < 5:
samples.append(
{
"document_key": doc_key,
"line_number": line_no,
"recorder_type": recorder_type,
"slot3_values": row_slot_values,
"account_dr_key": posting.get("AccountDr_Key"),
"account_cr_key": posting.get("AccountCr_Key"),
}
)
if non_null_rows > 0:
totals["sets_with_non_null_slot3"] += 1
if joined_rows > 0:
totals["sets_with_joined_slot3_rows"] += 1
totals["rows_with_non_null_slot3_total"] += non_null_rows
totals["rows_with_joined_slot3_total"] += joined_rows
per_set_reports.append(
{
"entity_set": entity_set,
"recorder_type": recorder_type,
"rows_fetched_baseline": len(baseline_rows),
"rows_fetched_by_filter": {k: len(v) for k, v in filtered_by_field.items()},
"slot3_fields": slot3_fields,
"slot3_field_non_null_counts": per_field_non_null,
"non_null_slot3_rows": non_null_rows,
"joined_slot3_rows": joined_rows,
"join_rate": round(joined_rows / non_null_rows, 4) if non_null_rows else 0.0,
"samples": samples,
}
)
per_set_reports.sort(
key=lambda x: (
x["joined_slot3_rows"] == 0,
-x["joined_slot3_rows"],
-x["non_null_slot3_rows"],
x["entity_set"].lower(),
)
)
report = {
"generated_at": utc_now_iso(),
"endpoint": settings.service_root,
"totals": totals,
"slot3_recon": per_set_reports,
}
output_path = LOGS_DIR / "slot3_recon_report.json"
output_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"[ok] saved: {output_path}")
print(
"[ok] slot3 summary: "
f"sets={totals['sets_with_slot3_fields']}, "
f"sets_with_non_null={totals['sets_with_non_null_slot3']}, "
f"sets_with_joined={totals['sets_with_joined_slot3_rows']}, "
f"rows_non_null={totals['rows_with_non_null_slot3_total']}, "
f"rows_joined={totals['rows_with_joined_slot3_total']}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())