NODEDC_1C/scripts/remap_snapshot_semantic_v2.py

176 lines
6.5 KiB
Python

from __future__ import annotations
import argparse
from collections import Counter
import json
from pathlib import Path
import sys
from typing import Any
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from canonical_layer.mappers import map_record
from config.settings import LOGS_DIR
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Remap existing snapshot with semantic-v2 mapper rules")
parser.add_argument(
"--input-snapshot",
default=str(LOGS_DIR / "pre_report_snapshot_2020_2020-06.json"),
help="Path to current snapshot json",
)
parser.add_argument(
"--output-snapshot",
default=str(LOGS_DIR / "pre_report_snapshot_2020_2020-06_semantic_v2.json"),
help="Path to remapped snapshot json",
)
parser.add_argument(
"--metrics-output",
default=str(LOGS_DIR / "pre_report_snapshot_2020_2020-06_semantic_v2_metrics.json"),
help="Path to before/after metrics json",
)
return parser.parse_args()
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def write_json(path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def is_unknown_source_id(value: Any) -> bool:
text = str(value or "").strip().lower()
return text in {"", "unknown", "none", "null", "n/a", "nan"}
def compute_link_metrics(items: list[dict[str, Any]]) -> dict[str, Any]:
links_total = 0
unknown_links = 0
relation_dist: Counter[str] = Counter()
target_dist: Counter[str] = Counter()
for item in items:
for link in item.get("links", []):
if not isinstance(link, dict):
continue
links_total += 1
relation = str(link.get("relation", "reference"))
target = str(link.get("target_entity", "Unknown"))
relation_dist[relation] += 1
target_dist[target] += 1
if target.lower() in {"unknown", ""}:
unknown_links += 1
semantic_coverage_pct = 0.0
if links_total:
semantic_coverage_pct = (links_total - unknown_links) / links_total * 100.0
return {
"links_total": links_total,
"unknown_links": unknown_links,
"semantic_coverage_pct": round(semantic_coverage_pct, 4),
"relation_types_total": len(relation_dist),
"relation_distribution": dict(relation_dist),
"target_entity_distribution_top20": dict(target_dist.most_common(20)),
}
def remap_snapshot(payload: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
source_items = payload.get("items", [])
remapped_items: list[dict[str, Any]] = []
remapped_per_entity_set: Counter[str] = Counter()
seen: set[tuple[str, str]] = set()
duplicates_skipped = 0
for raw_item in source_items:
if not isinstance(raw_item, dict):
continue
source_entity = str(raw_item.get("source_entity", ""))
attributes = raw_item.get("attributes", {})
if not isinstance(attributes, dict):
attributes = {}
mapped = map_record(source_entity, attributes).model_dump()
key = (str(mapped.get("source_entity", "")), str(mapped.get("source_id", "")))
if key in seen:
duplicates_skipped += 1
continue
seen.add(key)
remapped_items.append(mapped)
remapped_per_entity_set[str(mapped.get("source_entity", ""))] += 1
remapped_links_total = sum(len(item.get("links", [])) for item in remapped_items)
remapped_payload = {
"status": "success",
"year": payload.get("year"),
"selected_window_key": payload.get("selected_window_key"),
"selected_window_start": payload.get("selected_window_start"),
"selected_window_end_exclusive": payload.get("selected_window_end_exclusive"),
"records_exported_total": len(remapped_items),
"links_exported_total": remapped_links_total,
"records_per_entity_set": dict(sorted(remapped_per_entity_set.items(), key=lambda item: item[0])),
"truncated_entity_sets": payload.get("truncated_entity_sets", []),
"source_snapshot": payload.get("snapshot_output") or "pre_report_snapshot_2020_2020-06.json",
"semantic_mapper_version": "v2",
"duplicates_skipped": duplicates_skipped,
"items": remapped_items,
}
old_link_metrics = compute_link_metrics(source_items)
new_link_metrics = compute_link_metrics(remapped_items)
old_unknown_ids = sum(1 for row in source_items if is_unknown_source_id(row.get("source_id")))
new_unknown_ids = sum(1 for row in remapped_items if is_unknown_source_id(row.get("source_id")))
metrics = {
"status": "success",
"source_snapshot_records": len(source_items),
"remapped_snapshot_records": len(remapped_items),
"duplicates_skipped": duplicates_skipped,
"source_id_unknown_before": old_unknown_ids,
"source_id_unknown_after": new_unknown_ids,
"before": old_link_metrics,
"after": new_link_metrics,
}
return remapped_payload, metrics
def main() -> int:
args = parse_args()
input_path = Path(args.input_snapshot)
output_path = Path(args.output_snapshot)
metrics_path = Path(args.metrics_output)
if not input_path.exists():
raise FileNotFoundError(f"Input snapshot not found: {input_path}")
source_payload = load_json(input_path)
remapped_payload, metrics = remap_snapshot(source_payload)
write_json(output_path, remapped_payload)
write_json(metrics_path, metrics)
print(
json.dumps(
{
"status": "success",
"input_snapshot": str(input_path),
"output_snapshot": str(output_path),
"metrics_output": str(metrics_path),
"source_id_unknown_before": metrics["source_id_unknown_before"],
"source_id_unknown_after": metrics["source_id_unknown_after"],
"unknown_links_before": metrics["before"]["unknown_links"],
"unknown_links_after": metrics["after"]["unknown_links"],
"semantic_coverage_before": metrics["before"]["semantic_coverage_pct"],
"semantic_coverage_after": metrics["after"]["semantic_coverage_pct"],
},
ensure_ascii=False,
indent=2,
)
)
return 0
if __name__ == "__main__":
raise SystemExit(main())