from __future__ import annotations import argparse import json import time from collections import defaultdict from datetime import datetime, timezone from pathlib import Path import sys from typing import Any, TextIO PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from canonical_layer.mappers import map_record from canonical_layer.period_snapshot import normalize_dt, parse_record_datetime from config.client import ODataClient, extract_entity_sets from config.settings import load_settings def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Export full monthly snapshots for a year into docs/ARCH", ) parser.add_argument("--year", type=int, default=2020, help="Target year") parser.add_argument( "--output-dir", default=str(PROJECT_ROOT / "docs" / "ARCH" / "2020_monthly_full_snapshots"), help="Directory where monthly snapshots will be written", ) parser.add_argument("--page-size", type=int, default=500, help="OData $top page size") parser.add_argument( "--max-pages-per-set", type=int, default=200, help="Safety cap for pages per entity set (set higher for deeper export)", ) parser.add_argument( "--entity-set", action="append", default=None, help="Explicit entity set to include (repeat for multiple)", ) parser.add_argument( "--keyword", action="append", default=None, help="Filter entity sets by keyword (repeat for multiple)", ) parser.add_argument( "--progress-every", type=int, default=25, help="Print progress every N entity sets", ) return parser.parse_args() def month_bounds(year: int, month: int) -> tuple[datetime, datetime]: start = datetime(year, month, 1, 0, 0, 0, tzinfo=timezone.utc) if month == 12: end = datetime(year + 1, 1, 1, 0, 0, 0, tzinfo=timezone.utc) else: end = datetime(year, month + 1, 1, 0, 0, 0, tzinfo=timezone.utc) return start, end def resolve_entity_sets( *, client: ODataClient, explicit_sets: list[str] | None, keywords: list[str] | None, ) -> list[str]: if explicit_sets: unique = sorted({item.strip() for item in explicit_sets if item and item.strip()}) return unique metadata_xml = client.fetch_metadata() all_sets = [item["name"] for item in extract_entity_sets(metadata_xml)] if not keywords: return sorted(set(all_sets)) normalized_keywords = [k.strip().lower() for k in keywords if k.strip()] if not normalized_keywords: return sorted(set(all_sets)) filtered = [ entity_set for entity_set in all_sets if any(token in entity_set.lower() for token in normalized_keywords) ] return sorted(set(filtered)) def open_month_streams(tmp_dir: Path, year: int) -> dict[int, TextIO]: streams: dict[int, TextIO] = {} for month in range(1, 13): ndjson_path = tmp_dir / f"snapshot_{year}-{month:02d}.ndjson" streams[month] = ndjson_path.open("w", encoding="utf-8") return streams def close_month_streams(streams: dict[int, TextIO]) -> None: for stream in streams.values(): stream.close() def write_month_snapshot_json( *, output_path: Path, ndjson_path: Path, year: int, month: int, records_exported_total: int, links_exported_total: int, records_per_entity_set: dict[str, int], truncated_entity_sets: list[str], entity_sets_scanned_total: int, ) -> None: start, end = month_bounds(year, month) with output_path.open("w", encoding="utf-8") as out: out.write("{\n") out.write(' "status": "success",\n') out.write(f' "year": {year},\n') out.write(f' "month": "{month:02d}",\n') out.write(f' "selected_window_key": "{year}-{month:02d}",\n') out.write(f' "selected_window_start": "{start.isoformat()}",\n') out.write(f' "selected_window_end_exclusive": "{end.isoformat()}",\n') out.write(f' "records_exported_total": {records_exported_total},\n') out.write(f' "links_exported_total": {links_exported_total},\n') out.write(f' "entity_sets_scanned_total": {entity_sets_scanned_total},\n') out.write(' "records_per_entity_set": ') out.write(json.dumps(records_per_entity_set, ensure_ascii=False, indent=2)) out.write(",\n") out.write(' "truncated_entity_sets": ') out.write(json.dumps(truncated_entity_sets, ensure_ascii=False)) out.write(",\n") out.write(' "items": [\n') line_count = 0 with ndjson_path.open("r", encoding="utf-8") as source: for raw_line in source: line = raw_line.strip() if not line: continue if line_count > 0: out.write(",\n") out.write(" ") out.write(line) line_count += 1 out.write("\n ]\n") out.write("}\n") def main() -> int: args = parse_args() settings = load_settings() client = ODataClient(settings) output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) tmp_dir = output_dir / "_tmp_ndjson" tmp_dir.mkdir(parents=True, exist_ok=True) entity_sets = resolve_entity_sets( client=client, explicit_sets=args.entity_set, keywords=args.keyword, ) if not entity_sets: raise RuntimeError("No entity sets resolved for export") month_streams = open_month_streams(tmp_dir, args.year) month_records_per_set: dict[int, dict[str, int]] = {m: defaultdict(int) for m in range(1, 13)} month_records_total: dict[int, int] = {m: 0 for m in range(1, 13)} month_links_total: dict[int, int] = {m: 0 for m in range(1, 13)} entity_set_stats: dict[str, dict[str, Any]] = {} truncated_entity_sets: set[str] = set() started = time.time() try: for index, entity_set in enumerate(entity_sets, start=1): pages_read = 0 records_scanned = 0 records_without_datetime = 0 records_in_year = 0 mapping_errors = 0 truncated = False for page_idx in range(args.max_pages_per_set): skip = page_idx * args.page_size batch = client.read_entity_set_records( entity_set, top=args.page_size, extra_params={"$skip": skip}, ) pages_read += 1 if not batch: break records_scanned += len(batch) for row in batch: parsed = parse_record_datetime(row) if parsed is None: records_without_datetime += 1 continue dt = normalize_dt(parsed) if dt.year != args.year: continue records_in_year += 1 month = dt.month try: mapped = map_record(entity_set, row) except Exception: mapping_errors += 1 continue payload = mapped.model_dump() month_streams[month].write(json.dumps(payload, ensure_ascii=False)) month_streams[month].write("\n") month_records_total[month] += 1 month_links_total[month] += len(mapped.links) month_records_per_set[month][entity_set] += 1 if len(batch) < args.page_size: break else: truncated = True truncated_entity_sets.add(entity_set) entity_set_stats[entity_set] = { "pages_read": pages_read, "records_scanned": records_scanned, "records_without_datetime": records_without_datetime, "records_in_year": records_in_year, "mapping_errors": mapping_errors, "truncated_by_max_pages": truncated, } if args.progress_every > 0 and index % args.progress_every == 0: elapsed = round(time.time() - started, 2) print( json.dumps( { "progress": { "processed_entity_sets": index, "entity_sets_total": len(entity_sets), "elapsed_seconds": elapsed, } }, ensure_ascii=False, ) ) finally: close_month_streams(month_streams) snapshot_files: list[str] = [] for month in range(1, 13): ndjson_path = tmp_dir / f"snapshot_{args.year}-{month:02d}.ndjson" snapshot_path = output_dir / f"snapshot_{args.year}-{month:02d}_full.json" write_month_snapshot_json( output_path=snapshot_path, ndjson_path=ndjson_path, year=args.year, month=month, records_exported_total=month_records_total[month], links_exported_total=month_links_total[month], records_per_entity_set=dict(sorted(month_records_per_set[month].items())), truncated_entity_sets=sorted(truncated_entity_sets), entity_sets_scanned_total=len(entity_sets), ) snapshot_files.append(snapshot_path.name) manifest = { "status": "success", "year": args.year, "service_root": settings.service_root, "entity_sets_total": len(entity_sets), "entity_sets": entity_sets, "page_size": args.page_size, "max_pages_per_set": args.max_pages_per_set, "truncated_entity_sets_count": len(truncated_entity_sets), "truncated_entity_sets": sorted(truncated_entity_sets), "records_exported_per_month": { f"{args.year}-{month:02d}": month_records_total[month] for month in range(1, 13) }, "links_exported_per_month": { f"{args.year}-{month:02d}": month_links_total[month] for month in range(1, 13) }, "entity_set_stats": entity_set_stats, "snapshot_files": snapshot_files, "output_dir": str(output_dir), "generated_at_utc": datetime.now(timezone.utc).isoformat(), } manifest_path = output_dir / f"manifest_{args.year}_monthly_full.json" manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8") summary = { "status": "success", "year": args.year, "output_dir": str(output_dir), "manifest_path": str(manifest_path), "snapshot_files_count": len(snapshot_files), "records_exported_total": sum(month_records_total.values()), "truncated_entity_sets_count": len(truncated_entity_sets), } print(json.dumps(summary, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())