NODEDC_1C/odata_probe/dump_sample_links.py

88 lines
2.6 KiB
Python

from __future__ import annotations
import json
from typing import Any
from config.client import ODataClient, flatten_guid_like_fields, utc_now_iso
from config.settings import LOGS_DIR, load_settings
def _load_probe_targets() -> list[str]:
report_file = LOGS_DIR / "probe_report.json"
if not report_file.exists():
return []
payload = json.loads(report_file.read_text(encoding="utf-8"))
targets: list[str] = []
for entity in payload.get("entities", []):
if entity.get("status") == "ok" and entity.get("entity_set"):
targets.append(str(entity["entity_set"]))
return targets[:10]
def _extract_link_map(records: list[dict[str, Any]]) -> dict[str, list[str]]:
field_map: dict[str, set[str]] = {}
for row in records:
for field in flatten_guid_like_fields(row):
value = row.get(field)
if value is None:
continue
field_map.setdefault(field, set()).add(str(value))
normalized: dict[str, list[str]] = {}
for field, values in field_map.items():
normalized[field] = sorted(values)[:5]
return normalized
def main() -> int:
settings = load_settings()
client = ODataClient(settings)
targets = _load_probe_targets()
if not targets:
print(
"[error] probe_report.json not found or empty. "
"Run `python -m odata_probe.probe_entities` first."
)
return 1
links_payload: dict[str, Any] = {
"generated_at": utc_now_iso(),
"service_root": settings.service_root,
"entities": [],
}
for entity_set in targets:
try:
records = client.read_entity_set_records(entity_set, top=min(settings.probe_top, 5))
links_payload["entities"].append(
{
"entity_set": entity_set,
"records_fetched": len(records),
"link_field_samples": _extract_link_map(records),
}
)
print(f"[ok] {entity_set}: link map generated")
except Exception as exc:
links_payload["entities"].append(
{
"entity_set": entity_set,
"error": str(exc),
}
)
print(f"[warn] {entity_set}: {exc}")
output_file = LOGS_DIR / "sample_links.json"
output_file.write_text(
json.dumps(links_payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
print(f"[ok] saved link map: {output_file}")
return 0
if __name__ == "__main__":
raise SystemExit(main())