NODEDC_1C/scripts/deep_probe_subconto_join.py

229 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import sys
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from config.client import ODataClient, utc_now_iso
from config.settings import LOGS_DIR, load_settings
LINES_ENTITY_SET = "Document_РеализацияТоваровУслуг_Товары"
POSTING_ENTITY_SET = "AccountingRegister_Хозрасчетный_RecordType"
RECORDER_TYPE = "StandardODATA.Document_РеализацияТоваровУслуг"
LINE_FIELDS = ["Ref_Key", "LineNumber", "СчетУчета_Key", "Субконто", "Субконто_Type"]
POSTING_FIELDS = ["Recorder", "Recorder_Type", "LineNumber", "AccountDr_Key", "AccountCr_Key"]
def _extract_rows(payload: dict[str, Any]) -> list[dict[str, Any]]:
rows = payload.get("value")
if rows is None and isinstance(payload.get("d"), dict):
rows = payload["d"].get("results")
if rows is None:
return []
if isinstance(rows, list):
return rows
return [rows]
def _safe_read(
client: ODataClient,
entity_set: str,
*,
select_fields: list[str],
top: int = 200,
filter_expr: str | None = None,
) -> list[dict[str, Any]]:
params: dict[str, Any] = {"$select": ",".join(select_fields)}
if filter_expr:
params["$filter"] = filter_expr
try:
response = client.read_entity_set(entity_set, top=top, extra_params=params)
return _extract_rows(response.payload)
except Exception as exc:
print(f"[warn] read failed for {entity_set} (filter={filter_expr!r}): {exc}")
return []
def _group_lines_by_document(rows: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
grouped: dict[str, list[dict[str, Any]]] = {}
for row in rows:
ref_key = row.get("Ref_Key")
if not isinstance(ref_key, str) or not ref_key:
continue
grouped.setdefault(ref_key, []).append(row)
return grouped
def _pick_document_for_probe(grouped: dict[str, list[dict[str, Any]]]) -> tuple[str | None, list[dict[str, Any]]]:
best_doc: str | None = None
best_lines: list[dict[str, Any]] = []
best_score = -1
for doc_key, rows in grouped.items():
score = sum(1 for row in rows if row.get("Субконто_Type"))
if score > best_score:
best_score = score
best_doc = doc_key
best_lines = rows
return best_doc, best_lines
def _to_line_key(value: Any) -> str:
if value is None:
return ""
return str(value)
def main() -> int:
settings = load_settings()
client = ODataClient(settings)
line_rows = _safe_read(
client,
LINES_ENTITY_SET,
select_fields=LINE_FIELDS,
top=3000,
)
posting_rows_all = _safe_read(
client,
POSTING_ENTITY_SET,
select_fields=POSTING_FIELDS,
top=5000,
)
if not line_rows:
print("[error] No document line rows were fetched for probe.")
return 1
if not posting_rows_all:
print("[error] No posting rows were fetched for probe.")
return 1
grouped_lines = _group_lines_by_document(line_rows)
sales_postings = [
row
for row in posting_rows_all
if row.get("Recorder_Type") == RECORDER_TYPE and isinstance(row.get("Recorder"), str)
]
postings_by_doc: dict[str, list[dict[str, Any]]] = {}
for row in sales_postings:
recorder = row.get("Recorder")
if isinstance(recorder, str) and recorder:
postings_by_doc.setdefault(recorder, []).append(row)
tested_document_key: str | None = None
selected_lines: list[dict[str, Any]] = []
posting_rows: list[dict[str, Any]] = []
best_score = -1
for doc_key, doc_postings in postings_by_doc.items():
doc_lines = grouped_lines.get(doc_key, [])
if not doc_lines:
continue
subconto_typed = sum(1 for row in doc_lines if row.get("Субконто_Type"))
score = subconto_typed * 1000 + len(doc_postings)
if score > best_score:
best_score = score
tested_document_key = doc_key
selected_lines = doc_lines
posting_rows = doc_postings
if not tested_document_key:
tested_document_key, selected_lines = _pick_document_for_probe(grouped_lines)
if not tested_document_key:
print("[error] No suitable document key found in lines/postings overlap.")
return 1
posting_rows = postings_by_doc.get(tested_document_key, [])
postings_by_line: dict[str, list[dict[str, Any]]] = {}
for row in posting_rows:
key = _to_line_key(row.get("LineNumber"))
if key:
postings_by_line.setdefault(key, []).append(row)
joined_rows: list[dict[str, Any]] = []
chart_account_subconto_fields: list[str] = []
for line in selected_lines:
line_no = _to_line_key(line.get("LineNumber"))
if not line_no:
continue
candidates = postings_by_line.get(line_no, [])
if not candidates:
continue
line_account = line.get("СчетУчета_Key")
for posting in candidates:
account_dr = posting.get("AccountDr_Key")
account_cr = posting.get("AccountCr_Key")
account_match = bool(
isinstance(line_account, str)
and line_account
and line_account in {account_dr, account_cr}
)
joined_rows.append(
{
"line_number": line_no,
"recorder": posting.get("Recorder"),
"account_dr_key": account_dr,
"account_cr_key": account_cr,
"line_account_key": line_account,
"subconto_value": line.get("Субконто"),
"subconto_type": line.get("Субконто_Type"),
"account_match": account_match,
}
)
break
chain_a_status = "derivable" if joined_rows else "opaque"
chain_f_status = (
"derivable"
if any(row.get("subconto_type") for row in joined_rows)
else "opaque"
)
report = {
"generated_at": utc_now_iso(),
"endpoint": settings.service_root,
"tested_document_key": tested_document_key,
"posting_rows_for_document": len(posting_rows),
"line_rows_for_document": len(selected_lines),
"joined_rows": len(joined_rows),
"joined_sample": joined_rows[:10],
"chart_account_subconto_fields": chart_account_subconto_fields,
"chain_A_status": chain_a_status,
"chain_F_status": chain_f_status,
"chain_F_note": (
"Derivable by data-driven mapping (Account in posting + Subconto_Type in linked document lines). "
"Direct normative mapping from ChartOfAccounts fields is not exposed."
),
}
output_path = LOGS_DIR / "deep_subconto_join_probe.json"
output_path.write_text(
json.dumps(report, ensure_ascii=False, indent=2),
encoding="utf-8",
)
print(f"[ok] saved: {output_path}")
print(
f"[ok] chain A={chain_a_status}, chain F={chain_f_status}, "
f"joined_rows={len(joined_rows)}"
)
if chain_a_status != "derivable" or chain_f_status != "derivable":
print("[warn] Expected derivable statuses were not reached.")
return 2
return 0
if __name__ == "__main__":
raise SystemExit(main())