NODEDC_1C/scripts/deep_probe_accounting_mvp_g...

474 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from collections import defaultdict
from decimal import Decimal, InvalidOperation
import json
from pathlib import Path
import re
from typing import Any
import sys
import xml.etree.ElementTree as ET
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from config.client import ODataClient, utc_now_iso
from config.settings import LOGS_DIR, load_settings
POSTING_ENTITY_SET = "AccountingRegister_Хозрасчетный_RecordType"
POSTING_FIELDS = [
"Recorder",
"Recorder_Type",
"LineNumber",
"Period",
"Организация_Key",
"AccountDr_Key",
"AccountCr_Key",
"Сумма",
]
def _extract_rows(payload: dict[str, Any]) -> list[dict[str, Any]]:
rows = payload.get("value")
if rows is None and isinstance(payload.get("d"), dict):
rows = payload["d"].get("results")
if rows is None:
return []
if isinstance(rows, list):
return rows
return [rows]
def _safe_read(
client: ODataClient,
entity_set: str,
*,
select_fields: list[str],
top: int = 200,
) -> list[dict[str, Any]]:
params: dict[str, Any] = {"$select": ",".join(select_fields)}
try:
response = client.read_entity_set(entity_set, top=top, extra_params=params)
return _extract_rows(response.payload)
except Exception as exc:
print(f"[warn] read failed for {entity_set}: {exc.__class__.__name__}")
return []
def _to_decimal(value: Any) -> Decimal:
if value is None:
return Decimal("0")
if isinstance(value, Decimal):
return value
if isinstance(value, (int, float)):
return Decimal(str(value))
raw = str(value).strip().replace(",", ".")
if not raw:
return Decimal("0")
try:
return Decimal(raw)
except InvalidOperation:
return Decimal("0")
def _to_line_key(value: Any) -> str:
if value is None:
return ""
return str(value)
def _parse_metadata_candidates(metadata_path: Path) -> list[dict[str, Any]]:
root = ET.fromstring(metadata_path.read_text(encoding="utf-8"))
entity_type_props: dict[str, list[str]] = {}
for node in root.iter():
if not node.tag.endswith("EntityType"):
continue
name = node.attrib.get("Name", "")
if not name:
continue
props = [
child.attrib.get("Name", "")
for child in node
if child.tag.endswith("Property") and child.attrib.get("Name")
]
entity_type_props[name] = props
candidates: list[dict[str, Any]] = []
for node in root.iter():
if not node.tag.endswith("EntitySet"):
continue
set_name = node.attrib.get("Name", "")
entity_type_full = node.attrib.get("EntityType", "")
if not set_name or not entity_type_full:
continue
entity_type_name = entity_type_full.split(".")[-1]
props = entity_type_props.get(entity_type_name, [])
if not set_name.startswith("Document_"):
continue
if "Ref_Key" not in props or "LineNumber" not in props:
continue
subconto_fields = [
p
for p in props
if ("Субконто" in p or "Subconto" in p)
and not p.endswith("_Type")
]
if not subconto_fields:
continue
type_fields = [
p
for p in props
if ("Субконто" in p or "Subconto" in p)
and p.endswith("_Type")
]
candidates.append(
{
"entity_set": set_name,
"subconto_fields": subconto_fields,
"subconto_type_fields": type_fields,
"available_props": props,
}
)
return sorted(candidates, key=lambda item: item["entity_set"].lower())
def _derive_recorder_type(entity_set: str) -> str:
if "_" not in entity_set:
return f"StandardODATA.{entity_set}"
base_doc = entity_set.rsplit("_", 1)[0]
return f"StandardODATA.{base_doc}"
def _derive_select_fields(
subconto_fields: list[str],
subconto_type_fields: list[str],
available_props: list[str],
) -> list[str]:
allowed = set(available_props)
common = {"Ref_Key", "LineNumber"}
relation_hints = [
"Контрагент_Key",
"ДоговорКонтрагента_Key",
"Номенклатура_Key",
"Контрагент",
"ДоговорКонтрагента",
"Номенклатура",
]
for name in relation_hints:
if name in allowed:
common.add(name)
for name in subconto_fields:
common.add(name)
type_name = f"{name}_Type"
if type_name in allowed:
common.add(type_name)
for name in subconto_type_fields:
common.add(name)
return sorted(common)
def _categorize_type(raw: Any) -> str | None:
value = str(raw or "")
lowered = value.lower()
if (
"договор" in lowered
or "contract" in lowered
or "äîãîâîð" in lowered
):
return "contract"
if (
"контрагент" in lowered
or "counterparty" in lowered
or "êîíòðàãåíò" in lowered
):
return "counterparty"
if (
"номенклатур" in lowered
or "item" in lowered
or "nomencl" in lowered
or "íîìåíêëàòóð" in lowered
):
return "item"
return None
def _slot_id(field_name: str) -> str:
match = re.search(r"(\d+)(?:_Type)?$", field_name)
if match:
return match.group(1)
return "1"
def _build_posting_indexes(posting_rows: list[dict[str, Any]]) -> tuple[dict[tuple[str, str, str], dict[str, Any]], dict[str, list[dict[str, Any]]]]:
by_triple: dict[tuple[str, str, str], dict[str, Any]] = {}
by_account: dict[str, list[dict[str, Any]]] = defaultdict(list)
for row in posting_rows:
recorder = row.get("Recorder")
recorder_type = row.get("Recorder_Type")
line_key = _to_line_key(row.get("LineNumber"))
if isinstance(recorder, str) and isinstance(recorder_type, str) and line_key:
by_triple[(recorder, recorder_type, line_key)] = row
dr = row.get("AccountDr_Key")
cr = row.get("AccountCr_Key")
if isinstance(dr, str) and dr:
by_account[dr].append(row)
if isinstance(cr, str) and cr:
by_account[cr].append(row)
return by_triple, by_account
def _load_slot3_recon_summary() -> dict[str, Any]:
report_path = LOGS_DIR / "slot3_recon_report.json"
if not report_path.exists():
return {
"report_found": False,
"rows_with_non_null_slot3_total": 0,
"rows_with_joined_slot3_total": 0,
}
try:
payload = json.loads(report_path.read_text(encoding="utf-8"))
except Exception:
return {
"report_found": False,
"rows_with_non_null_slot3_total": 0,
"rows_with_joined_slot3_total": 0,
}
totals = payload.get("totals", {})
return {
"report_found": True,
"rows_with_non_null_slot3_total": int(totals.get("rows_with_non_null_slot3_total", 0) or 0),
"rows_with_joined_slot3_total": int(totals.get("rows_with_joined_slot3_total", 0) or 0),
}
def main() -> int:
settings = load_settings()
client = ODataClient(settings)
metadata_path = LOGS_DIR / "metadata.xml"
if not metadata_path.exists():
print("[error] metadata.xml not found. Run probe first.")
return 1
posting_rows = _safe_read(
client,
POSTING_ENTITY_SET,
select_fields=POSTING_FIELDS,
top=8000,
)
if not posting_rows:
print("[error] no posting rows fetched.")
return 1
posting_by_triple, posting_by_account = _build_posting_indexes(posting_rows)
candidates = _parse_metadata_candidates(metadata_path)
joined_evidence: list[dict[str, Any]] = []
dimensions_found: set[str] = set()
slots_found: set[str] = set()
scanned_sets = 0
for candidate in candidates:
entity_set = candidate["entity_set"]
recorder_type = _derive_recorder_type(entity_set)
select_fields = _derive_select_fields(
candidate["subconto_fields"],
candidate["subconto_type_fields"],
candidate["available_props"],
)
line_rows = _safe_read(client, entity_set, select_fields=select_fields, top=600)
if not line_rows:
continue
scanned_sets += 1
for line in line_rows:
doc_key = line.get("Ref_Key")
if not isinstance(doc_key, str) or not doc_key:
continue
line_key = _to_line_key(line.get("LineNumber"))
if not line_key:
continue
posting = posting_by_triple.get((doc_key, recorder_type, line_key))
if not posting:
continue
record = {
"entity_set": entity_set,
"document_key": doc_key,
"line_number": line_key,
"recorder_type": recorder_type,
"account_dr_key": posting.get("AccountDr_Key"),
"account_cr_key": posting.get("AccountCr_Key"),
"subconto": [],
}
for field_name in candidate["subconto_fields"]:
value = line.get(field_name)
if value in (None, ""):
continue
inferred_type = line.get(f"{field_name}_Type")
if inferred_type in (None, ""):
for type_field in candidate["subconto_type_fields"]:
if type_field.startswith(field_name):
inferred_type = line.get(type_field)
if inferred_type not in (None, ""):
break
category = _categorize_type(inferred_type)
slot = _slot_id(field_name)
slots_found.add(slot)
if category:
dimensions_found.add(category)
record["subconto"].append(
{
"slot": slot,
"field": field_name,
"value": value,
"type": inferred_type,
"category": category,
}
)
if record["subconto"]:
joined_evidence.append(record)
if {"counterparty", "contract", "item"}.issubset(dimensions_found) and {"1", "2", "3"}.issubset(slots_found):
break
if {"counterparty", "contract", "item"}.issubset(dimensions_found) and {"1", "2", "3"}.issubset(slots_found):
break
# Check 1: document -> posting -> debit/credit account
check1_pass = any(
isinstance(item.get("account_dr_key"), str)
and item.get("account_dr_key")
and isinstance(item.get("account_cr_key"), str)
and item.get("account_cr_key")
for item in joined_evidence
)
check1_sample = joined_evidence[0] if joined_evidence else None
# Check 2: posting -> subconto[1..3] -> counterparty/contract/item
required_dimensions = {"counterparty", "contract", "item"}
required_slots = {"1", "2", "3"}
slot3_recon_summary = _load_slot3_recon_summary()
if slot3_recon_summary["rows_with_joined_slot3_total"] > 0:
slots_found.add("3")
check2_pass = required_dimensions.issubset(dimensions_found) and required_slots.issubset(slots_found)
# Check 3: explain one real saldo via movements
account_stats: dict[str, dict[str, Any]] = {}
for account_key, rows in posting_by_account.items():
saldo = Decimal("0")
debit_turnover = Decimal("0")
credit_turnover = Decimal("0")
for row in rows:
amount = _to_decimal(row.get("Сумма"))
if row.get("AccountDr_Key") == account_key:
debit_turnover += amount
saldo += amount
if row.get("AccountCr_Key") == account_key:
credit_turnover += amount
saldo -= amount
account_stats[account_key] = {
"movement_count": len(rows),
"debit_turnover": debit_turnover,
"credit_turnover": credit_turnover,
"saldo": saldo,
}
chosen_account = None
chosen_account_stat: dict[str, Any] | None = None
best_score = Decimal("0")
for account_key, stat in account_stats.items():
if stat["movement_count"] < 3:
continue
score = abs(stat["saldo"])
if score > best_score:
best_score = score
chosen_account = account_key
chosen_account_stat = stat
saldo_sample: list[dict[str, Any]] = []
if chosen_account:
rows = posting_by_account.get(chosen_account, [])
for row in rows[:25]:
amount = _to_decimal(row.get("Сумма"))
sign = Decimal("0")
if row.get("AccountDr_Key") == chosen_account:
sign += amount
if row.get("AccountCr_Key") == chosen_account:
sign -= amount
saldo_sample.append(
{
"period": row.get("Period"),
"recorder": row.get("Recorder"),
"recorder_type": row.get("Recorder_Type"),
"line_number": row.get("LineNumber"),
"amount": str(amount),
"account_dr_key": row.get("AccountDr_Key"),
"account_cr_key": row.get("AccountCr_Key"),
"delta_to_saldo": str(sign),
}
)
check3_pass = bool(chosen_account and chosen_account_stat and saldo_sample)
report = {
"generated_at": utc_now_iso(),
"endpoint": settings.service_root,
"checks": {
"document_to_posting_to_accounts": {
"status": "pass" if check1_pass else "fail",
"evidence_sample": check1_sample,
"joined_rows_found": len(joined_evidence),
"line_sets_scanned": scanned_sets,
},
"posting_to_subconto123_to_counterparty_contract_item": {
"status": "pass" if check2_pass else "fail",
"required_dimensions": sorted(required_dimensions),
"found_dimensions": sorted(dimensions_found),
"required_slots": sorted(required_slots),
"found_slots": sorted(slots_found),
"slot3_recon_summary": slot3_recon_summary,
"evidence_sample": joined_evidence[:10],
},
"saldo_explainability_from_movements": {
"status": "pass" if check3_pass else "fail",
"account_key": chosen_account,
"movement_count": chosen_account_stat["movement_count"] if chosen_account_stat else 0,
"debit_turnover": str(chosen_account_stat["debit_turnover"]) if chosen_account_stat else "0",
"credit_turnover": str(chosen_account_stat["credit_turnover"]) if chosen_account_stat else "0",
"saldo": str(chosen_account_stat["saldo"]) if chosen_account_stat else "0",
"movement_sample": saldo_sample,
},
},
}
all_pass = check1_pass and check2_pass and check3_pass
report["final_verdict"] = (
"OData sufficient for MVP accounting ontology"
if all_pass
else "Not yet sufficient for MVP accounting ontology; deeper access is justified for failed checks."
)
output_path = LOGS_DIR / "deep_accounting_mvp_gate.json"
output_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"[ok] saved: {output_path}")
print(
"[ok] checks: "
f"doc->posting->accounts={'pass' if check1_pass else 'fail'}, "
f"posting->subconto123={'pass' if check2_pass else 'fail'}, "
f"saldo_explainability={'pass' if check3_pass else 'fail'}"
)
print(f"[ok] verdict: {report['final_verdict']}")
return 0 if all_pass else 2
if __name__ == "__main__":
raise SystemExit(main())