NODEDC_1C/scripts/deep_probe_subconto.py

258 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import xml.etree.ElementTree as ET
import sys
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from config.client import ODataClient, utc_now_iso
from config.settings import LOGS_DIR, load_settings
def _tag_ends(tag: str, suffix: str) -> bool:
return tag.endswith(suffix)
def _parse_metadata(metadata_path: Path) -> tuple[dict[str, list[str]], dict[str, str]]:
root = ET.fromstring(metadata_path.read_text(encoding="utf-8"))
entity_type_props: dict[str, list[str]] = {}
for entity_type in root.iter():
if not _tag_ends(entity_type.tag, "EntityType"):
continue
name = entity_type.attrib.get("Name", "")
if not name:
continue
props: list[str] = []
for child in entity_type:
if _tag_ends(child.tag, "Property"):
prop_name = child.attrib.get("Name", "")
if prop_name:
props.append(prop_name)
entity_type_props[name] = props
entity_set_to_type: dict[str, str] = {}
for entity_set in root.iter():
if not _tag_ends(entity_set.tag, "EntitySet"):
continue
set_name = entity_set.attrib.get("Name", "")
et = entity_set.attrib.get("EntityType", "")
if not set_name or not et:
continue
entity_type_name = et.split(".")[-1]
entity_set_to_type[set_name] = entity_type_name
return entity_type_props, entity_set_to_type
def _entity_sets_with_subconto(
entity_type_props: dict[str, list[str]],
entity_set_to_type: dict[str, str],
) -> list[dict[str, Any]]:
results: list[dict[str, Any]] = []
for entity_set, entity_type in entity_set_to_type.items():
props = entity_type_props.get(entity_type, [])
subconto_props = [p for p in props if "Субконто" in p or "Subconto" in p]
if not subconto_props:
continue
account_props = [p for p in props if p.startswith("Account") or "Счет" in p]
results.append(
{
"entity_set": entity_set,
"entity_type": entity_type,
"subconto_properties": subconto_props,
"account_properties": account_props,
"has_account_and_subconto": bool(account_props),
}
)
results.sort(key=lambda x: x["entity_set"].lower())
return results
def _safe_read_selected(
client: ODataClient,
entity_set: str,
select_fields: list[str] | None = None,
top: int = 20,
) -> dict[str, Any]:
params: dict[str, Any] = {}
if select_fields:
params["$select"] = ",".join(select_fields)
try:
response = client.read_entity_set(entity_set, top=top, extra_params=params or None)
payload = response.payload
rows = payload.get("value")
if rows is None and isinstance(payload.get("d"), dict):
rows = payload["d"].get("results")
if rows is None:
rows = []
if not isinstance(rows, list):
rows = [rows]
return {"status": "ok", "rows": rows}
except Exception as exc:
return {"status": "error", "error": str(exc), "rows": []}
def _non_null_subconto_counts(rows: list[dict[str, Any]]) -> dict[str, int]:
counts: dict[str, int] = {}
for row in rows:
for key, value in row.items():
if "Субконто" not in key and "Subconto" not in key:
continue
if value is None or value == "":
continue
counts[key] = counts.get(key, 0) + 1
return counts
def _load_join_probe_override(logs_dir: Path) -> dict[str, Any]:
path = logs_dir / "deep_subconto_join_probe.json"
if not path.exists():
return {"available": False}
try:
payload = json.loads(path.read_text(encoding="utf-8-sig"))
except Exception as exc:
return {"available": False, "error": str(exc)}
chain_a = payload.get("chain_A_status")
chain_f = payload.get("chain_F_status")
valid = {"derivable", "opaque"}
return {
"available": True,
"path": str(path),
"chain_A_status": chain_a if chain_a in valid else None,
"chain_F_status": chain_f if chain_f in valid else None,
}
def main() -> int:
settings = load_settings()
client = ODataClient(settings)
metadata_path = LOGS_DIR / "metadata.xml"
if not metadata_path.exists():
print("[error] metadata.xml not found. Run fetch_metadata first.")
return 1
entity_type_props, entity_set_to_type = _parse_metadata(metadata_path)
subconto_sets = _entity_sets_with_subconto(entity_type_props, entity_set_to_type)
# Targeted probes for A/F chains
target_sets = [
"AccountingRegister_Хозрасчетный_RecordType",
"AccountingRegister_Хозрасчетный",
"ChartOfAccounts_Хозрасчетный",
"ChartOfCharacteristicTypes_ВидыСубконтоХозрасчетные",
"Document_ОперацияБух",
"Document_ОперацияБух_ТаблицаРегистровБухгалтерии",
"Document_РеализацияТоваровУслуг",
"Document_ПоступлениеТоваровУслуг",
]
target_results: list[dict[str, Any]] = []
for entity_set in target_sets:
if entity_set not in entity_set_to_type:
target_results.append(
{
"entity_set": entity_set,
"status": "missing_in_metadata",
}
)
continue
et = entity_set_to_type[entity_set]
props = entity_type_props.get(et, [])
key_fields = [
p
for p in props
if p in {"Ref_Key", "Recorder", "Recorder_Type", "AccountDr_Key", "AccountCr_Key", "Организация_Key"}
or "Субконто" in p
]
probe = _safe_read_selected(client, entity_set, select_fields=key_fields or None, top=30)
rows = probe.get("rows", [])
target_results.append(
{
"entity_set": entity_set,
"entity_type": et,
"status": probe.get("status"),
"error": probe.get("error"),
"rows_fetched": len(rows),
"selected_fields": key_fields,
"non_null_subconto_counts": _non_null_subconto_counts(rows),
"sample_rows": rows[:3],
}
)
# Chain A/F compact verdict helpers
rec_type = next((x for x in target_results if x["entity_set"] == "AccountingRegister_Хозрасчетный_RecordType"), None)
chart_accounts = next((x for x in target_results if x["entity_set"] == "ChartOfAccounts_Хозрасчетный"), None)
chart_subconto = next((x for x in target_results if x["entity_set"] == "ChartOfCharacteristicTypes_ВидыСубконтоХозрасчетные"), None)
op_buh_tbl = next((x for x in target_results if x["entity_set"] == "Document_ОперацияБух_ТаблицаРегистровБухгалтерии"), None)
chain_a_status = "opaque"
if rec_type and rec_type.get("status") == "ok" and rec_type.get("rows_fetched", 0) > 0:
if rec_type.get("non_null_subconto_counts"):
chain_a_status = "derivable"
elif op_buh_tbl and op_buh_tbl.get("non_null_subconto_counts"):
chain_a_status = "derivable"
chain_f_status = "opaque"
if (
chart_accounts
and chart_accounts.get("status") == "ok"
and chart_accounts.get("rows_fetched", 0) > 0
and chart_subconto
and chart_subconto.get("status") == "ok"
and chart_subconto.get("rows_fetched", 0) > 0
and rec_type
and rec_type.get("status") == "ok"
and rec_type.get("rows_fetched", 0) > 0
):
if rec_type.get("non_null_subconto_counts"):
chain_f_status = "derivable"
join_probe_override = _load_join_probe_override(LOGS_DIR)
if join_probe_override.get("available"):
override_a = join_probe_override.get("chain_A_status")
override_f = join_probe_override.get("chain_F_status")
if override_a == "derivable":
chain_a_status = "derivable"
if override_f == "derivable":
chain_f_status = "derivable"
report = {
"generated_at": utc_now_iso(),
"endpoint": settings.service_root,
"entity_sets_with_subconto_total": len(subconto_sets),
"entity_sets_with_subconto": subconto_sets,
"targeted_results": target_results,
"chain_assessment": {
"A_document_to_posting_account_subconto": chain_a_status,
"F_chart_to_subconto_to_posting": chain_f_status,
},
"override_from_join_probe": join_probe_override,
}
out_path = LOGS_DIR / "deep_subconto_probe.json"
out_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"[ok] saved: {out_path}")
print(f"[ok] entity_sets_with_subconto_total={len(subconto_sets)}")
print(
"[ok] chain A="
+ report["chain_assessment"]["A_document_to_posting_account_subconto"]
+ ", chain F="
+ report["chain_assessment"]["F_chart_to_subconto_to_posting"]
)
return 0
if __name__ == "__main__":
raise SystemExit(main())