176 lines
6.5 KiB
Python
176 lines
6.5 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
from collections import Counter
|
|
import json
|
|
from pathlib import Path
|
|
import sys
|
|
from typing import Any
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
if str(PROJECT_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
from canonical_layer.mappers import map_record
|
|
from config.settings import LOGS_DIR
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Remap existing snapshot with semantic-v2 mapper rules")
|
|
parser.add_argument(
|
|
"--input-snapshot",
|
|
default=str(LOGS_DIR / "pre_report_snapshot_2020_2020-06.json"),
|
|
help="Path to current snapshot json",
|
|
)
|
|
parser.add_argument(
|
|
"--output-snapshot",
|
|
default=str(LOGS_DIR / "pre_report_snapshot_2020_2020-06_semantic_v2.json"),
|
|
help="Path to remapped snapshot json",
|
|
)
|
|
parser.add_argument(
|
|
"--metrics-output",
|
|
default=str(LOGS_DIR / "pre_report_snapshot_2020_2020-06_semantic_v2_metrics.json"),
|
|
help="Path to before/after metrics json",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def load_json(path: Path) -> dict[str, Any]:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def write_json(path: Path, payload: dict[str, Any]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
|
|
def is_unknown_source_id(value: Any) -> bool:
|
|
text = str(value or "").strip().lower()
|
|
return text in {"", "unknown", "none", "null", "n/a", "nan"}
|
|
|
|
|
|
def compute_link_metrics(items: list[dict[str, Any]]) -> dict[str, Any]:
|
|
links_total = 0
|
|
unknown_links = 0
|
|
relation_dist: Counter[str] = Counter()
|
|
target_dist: Counter[str] = Counter()
|
|
for item in items:
|
|
for link in item.get("links", []):
|
|
if not isinstance(link, dict):
|
|
continue
|
|
links_total += 1
|
|
relation = str(link.get("relation", "reference"))
|
|
target = str(link.get("target_entity", "Unknown"))
|
|
relation_dist[relation] += 1
|
|
target_dist[target] += 1
|
|
if target.lower() in {"unknown", ""}:
|
|
unknown_links += 1
|
|
semantic_coverage_pct = 0.0
|
|
if links_total:
|
|
semantic_coverage_pct = (links_total - unknown_links) / links_total * 100.0
|
|
return {
|
|
"links_total": links_total,
|
|
"unknown_links": unknown_links,
|
|
"semantic_coverage_pct": round(semantic_coverage_pct, 4),
|
|
"relation_types_total": len(relation_dist),
|
|
"relation_distribution": dict(relation_dist),
|
|
"target_entity_distribution_top20": dict(target_dist.most_common(20)),
|
|
}
|
|
|
|
|
|
def remap_snapshot(payload: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
|
|
source_items = payload.get("items", [])
|
|
remapped_items: list[dict[str, Any]] = []
|
|
remapped_per_entity_set: Counter[str] = Counter()
|
|
seen: set[tuple[str, str]] = set()
|
|
duplicates_skipped = 0
|
|
|
|
for raw_item in source_items:
|
|
if not isinstance(raw_item, dict):
|
|
continue
|
|
source_entity = str(raw_item.get("source_entity", ""))
|
|
attributes = raw_item.get("attributes", {})
|
|
if not isinstance(attributes, dict):
|
|
attributes = {}
|
|
mapped = map_record(source_entity, attributes).model_dump()
|
|
key = (str(mapped.get("source_entity", "")), str(mapped.get("source_id", "")))
|
|
if key in seen:
|
|
duplicates_skipped += 1
|
|
continue
|
|
seen.add(key)
|
|
remapped_items.append(mapped)
|
|
remapped_per_entity_set[str(mapped.get("source_entity", ""))] += 1
|
|
|
|
remapped_links_total = sum(len(item.get("links", [])) for item in remapped_items)
|
|
remapped_payload = {
|
|
"status": "success",
|
|
"year": payload.get("year"),
|
|
"selected_window_key": payload.get("selected_window_key"),
|
|
"selected_window_start": payload.get("selected_window_start"),
|
|
"selected_window_end_exclusive": payload.get("selected_window_end_exclusive"),
|
|
"records_exported_total": len(remapped_items),
|
|
"links_exported_total": remapped_links_total,
|
|
"records_per_entity_set": dict(sorted(remapped_per_entity_set.items(), key=lambda item: item[0])),
|
|
"truncated_entity_sets": payload.get("truncated_entity_sets", []),
|
|
"source_snapshot": payload.get("snapshot_output") or "pre_report_snapshot_2020_2020-06.json",
|
|
"semantic_mapper_version": "v2",
|
|
"duplicates_skipped": duplicates_skipped,
|
|
"items": remapped_items,
|
|
}
|
|
|
|
old_link_metrics = compute_link_metrics(source_items)
|
|
new_link_metrics = compute_link_metrics(remapped_items)
|
|
old_unknown_ids = sum(1 for row in source_items if is_unknown_source_id(row.get("source_id")))
|
|
new_unknown_ids = sum(1 for row in remapped_items if is_unknown_source_id(row.get("source_id")))
|
|
|
|
metrics = {
|
|
"status": "success",
|
|
"source_snapshot_records": len(source_items),
|
|
"remapped_snapshot_records": len(remapped_items),
|
|
"duplicates_skipped": duplicates_skipped,
|
|
"source_id_unknown_before": old_unknown_ids,
|
|
"source_id_unknown_after": new_unknown_ids,
|
|
"before": old_link_metrics,
|
|
"after": new_link_metrics,
|
|
}
|
|
return remapped_payload, metrics
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
input_path = Path(args.input_snapshot)
|
|
output_path = Path(args.output_snapshot)
|
|
metrics_path = Path(args.metrics_output)
|
|
|
|
if not input_path.exists():
|
|
raise FileNotFoundError(f"Input snapshot not found: {input_path}")
|
|
|
|
source_payload = load_json(input_path)
|
|
remapped_payload, metrics = remap_snapshot(source_payload)
|
|
write_json(output_path, remapped_payload)
|
|
write_json(metrics_path, metrics)
|
|
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"status": "success",
|
|
"input_snapshot": str(input_path),
|
|
"output_snapshot": str(output_path),
|
|
"metrics_output": str(metrics_path),
|
|
"source_id_unknown_before": metrics["source_id_unknown_before"],
|
|
"source_id_unknown_after": metrics["source_id_unknown_after"],
|
|
"unknown_links_before": metrics["before"]["unknown_links"],
|
|
"unknown_links_after": metrics["after"]["unknown_links"],
|
|
"semantic_coverage_before": metrics["before"]["semantic_coverage_pct"],
|
|
"semantic_coverage_after": metrics["after"]["semantic_coverage_pct"],
|
|
},
|
|
ensure_ascii=False,
|
|
indent=2,
|
|
)
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|