from __future__ import annotations import argparse from collections import Counter import json from pathlib import Path import sys from typing import Any PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from canonical_layer.mappers import map_record from config.settings import LOGS_DIR def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Remap existing snapshot with semantic-v2 mapper rules") parser.add_argument( "--input-snapshot", default=str(LOGS_DIR / "pre_report_snapshot_2020_2020-06.json"), help="Path to current snapshot json", ) parser.add_argument( "--output-snapshot", default=str(LOGS_DIR / "pre_report_snapshot_2020_2020-06_semantic_v2.json"), help="Path to remapped snapshot json", ) parser.add_argument( "--metrics-output", default=str(LOGS_DIR / "pre_report_snapshot_2020_2020-06_semantic_v2_metrics.json"), help="Path to before/after metrics json", ) return parser.parse_args() def load_json(path: Path) -> dict[str, Any]: return json.loads(path.read_text(encoding="utf-8")) def write_json(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def is_unknown_source_id(value: Any) -> bool: text = str(value or "").strip().lower() return text in {"", "unknown", "none", "null", "n/a", "nan"} def compute_link_metrics(items: list[dict[str, Any]]) -> dict[str, Any]: links_total = 0 unknown_links = 0 relation_dist: Counter[str] = Counter() target_dist: Counter[str] = Counter() for item in items: for link in item.get("links", []): if not isinstance(link, dict): continue links_total += 1 relation = str(link.get("relation", "reference")) target = str(link.get("target_entity", "Unknown")) relation_dist[relation] += 1 target_dist[target] += 1 if target.lower() in {"unknown", ""}: unknown_links += 1 semantic_coverage_pct = 0.0 if links_total: semantic_coverage_pct = (links_total - unknown_links) / links_total * 100.0 return { "links_total": links_total, "unknown_links": unknown_links, "semantic_coverage_pct": round(semantic_coverage_pct, 4), "relation_types_total": len(relation_dist), "relation_distribution": dict(relation_dist), "target_entity_distribution_top20": dict(target_dist.most_common(20)), } def remap_snapshot(payload: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]: source_items = payload.get("items", []) remapped_items: list[dict[str, Any]] = [] remapped_per_entity_set: Counter[str] = Counter() seen: set[tuple[str, str]] = set() duplicates_skipped = 0 for raw_item in source_items: if not isinstance(raw_item, dict): continue source_entity = str(raw_item.get("source_entity", "")) attributes = raw_item.get("attributes", {}) if not isinstance(attributes, dict): attributes = {} mapped = map_record(source_entity, attributes).model_dump() key = (str(mapped.get("source_entity", "")), str(mapped.get("source_id", ""))) if key in seen: duplicates_skipped += 1 continue seen.add(key) remapped_items.append(mapped) remapped_per_entity_set[str(mapped.get("source_entity", ""))] += 1 remapped_links_total = sum(len(item.get("links", [])) for item in remapped_items) remapped_payload = { "status": "success", "year": payload.get("year"), "selected_window_key": payload.get("selected_window_key"), "selected_window_start": payload.get("selected_window_start"), "selected_window_end_exclusive": payload.get("selected_window_end_exclusive"), "records_exported_total": len(remapped_items), "links_exported_total": remapped_links_total, "records_per_entity_set": dict(sorted(remapped_per_entity_set.items(), key=lambda item: item[0])), "truncated_entity_sets": payload.get("truncated_entity_sets", []), "source_snapshot": payload.get("snapshot_output") or "pre_report_snapshot_2020_2020-06.json", "semantic_mapper_version": "v2", "duplicates_skipped": duplicates_skipped, "items": remapped_items, } old_link_metrics = compute_link_metrics(source_items) new_link_metrics = compute_link_metrics(remapped_items) old_unknown_ids = sum(1 for row in source_items if is_unknown_source_id(row.get("source_id"))) new_unknown_ids = sum(1 for row in remapped_items if is_unknown_source_id(row.get("source_id"))) metrics = { "status": "success", "source_snapshot_records": len(source_items), "remapped_snapshot_records": len(remapped_items), "duplicates_skipped": duplicates_skipped, "source_id_unknown_before": old_unknown_ids, "source_id_unknown_after": new_unknown_ids, "before": old_link_metrics, "after": new_link_metrics, } return remapped_payload, metrics def main() -> int: args = parse_args() input_path = Path(args.input_snapshot) output_path = Path(args.output_snapshot) metrics_path = Path(args.metrics_output) if not input_path.exists(): raise FileNotFoundError(f"Input snapshot not found: {input_path}") source_payload = load_json(input_path) remapped_payload, metrics = remap_snapshot(source_payload) write_json(output_path, remapped_payload) write_json(metrics_path, metrics) print( json.dumps( { "status": "success", "input_snapshot": str(input_path), "output_snapshot": str(output_path), "metrics_output": str(metrics_path), "source_id_unknown_before": metrics["source_id_unknown_before"], "source_id_unknown_after": metrics["source_id_unknown_after"], "unknown_links_before": metrics["before"]["unknown_links"], "unknown_links_after": metrics["after"]["unknown_links"], "semantic_coverage_before": metrics["before"]["semantic_coverage_pct"], "semantic_coverage_after": metrics["after"]["semantic_coverage_pct"], }, ensure_ascii=False, indent=2, ) ) return 0 if __name__ == "__main__": raise SystemExit(main())