from __future__ import annotations import argparse import hashlib import json import time from collections import defaultdict from datetime import datetime, timezone from pathlib import Path import sys from typing import Any, TextIO PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from canonical_layer.mappers import map_record from canonical_layer.period_snapshot import normalize_dt, parse_dt, parse_record_datetime from config.client import ODataClient, extract_entity_sets from config.settings import load_settings DATE_FIELD_HINTS = ( "date", "period", "posted", "datetime", "timestamp", "дата", "период", "проведен", "момент", ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Export full cumulative monthly snapshots for company data", ) parser.add_argument("--year", type=int, default=2020, help="Target year") parser.add_argument( "--output-dir", default=str(PROJECT_ROOT / "docs" / "ARCH" / "2020_monthly_company_asof_full"), help="Directory where monthly snapshots will be written", ) parser.add_argument("--page-size", type=int, default=500, help="OData $top page size") parser.add_argument( "--max-pages-per-set", type=int, default=0, help="Max pages per entity set, 0 = unlimited", ) parser.add_argument( "--entity-set", action="append", default=None, help="Explicit entity set to include (repeat for multiple)", ) parser.add_argument( "--keyword", action="append", default=None, help="Filter entity sets by keyword (repeat for multiple)", ) parser.add_argument( "--include-undated", choices=("all_months", "skip"), default="all_months", help="How to treat records without detected date", ) parser.add_argument( "--progress-every", type=int, default=10, help="Print progress every N entity sets", ) parser.add_argument( "--progress-pages-every", type=int, default=50, help="Print progress every N pages inside current entity set (0 to disable)", ) parser.add_argument( "--flush-every-records", type=int, default=5000, help="Flush NDJSON streams every N written records (0 to disable)", ) return parser.parse_args() def month_bounds(year: int, month: int) -> tuple[datetime, datetime]: start = datetime(year, month, 1, 0, 0, 0, tzinfo=timezone.utc) if month == 12: end = datetime(year + 1, 1, 1, 0, 0, 0, tzinfo=timezone.utc) else: end = datetime(year, month + 1, 1, 0, 0, 0, tzinfo=timezone.utc) return start, end def resolve_entity_sets( *, client: ODataClient, explicit_sets: list[str] | None, keywords: list[str] | None, ) -> list[str]: if explicit_sets: unique = sorted({item.strip() for item in explicit_sets if item and item.strip()}) return unique metadata_xml = client.fetch_metadata() all_sets = [item["name"] for item in extract_entity_sets(metadata_xml)] if not keywords: return sorted(set(all_sets)) normalized_keywords = [k.strip().lower() for k in keywords if k.strip()] if not normalized_keywords: return sorted(set(all_sets)) filtered = [ entity_set for entity_set in all_sets if any(token in entity_set.lower() for token in normalized_keywords) ] return sorted(set(filtered)) def open_month_streams(tmp_dir: Path, year: int) -> dict[int, TextIO]: streams: dict[int, TextIO] = {} for month in range(1, 13): ndjson_path = tmp_dir / f"snapshot_{year}-{month:02d}.ndjson" streams[month] = ndjson_path.open("w", encoding="utf-8") return streams def close_month_streams(streams: dict[int, TextIO]) -> None: for stream in streams.values(): stream.close() def write_month_snapshot_json( *, output_path: Path, ndjson_path: Path, year: int, month: int, records_exported_total: int, links_exported_total: int, records_per_entity_set: dict[str, int], truncated_entity_sets: list[str], entity_sets_scanned_total: int, dated_records_total: int, undated_records_total: int, ) -> None: start, end = month_bounds(year, month) with output_path.open("w", encoding="utf-8") as out: out.write("{\n") out.write(' "status": "success",\n') out.write(' "snapshot_mode": "cumulative_asof",\n') out.write(f' "year": {year},\n') out.write(f' "month": "{month:02d}",\n') out.write(f' "selected_window_key": "{year}-{month:02d}",\n') out.write(f' "selected_window_start": "{start.isoformat()}",\n') out.write(f' "selected_window_end_exclusive": "{end.isoformat()}",\n') out.write(f' "records_exported_total": {records_exported_total},\n') out.write(f' "dated_records_exported_total": {dated_records_total},\n') out.write(f' "undated_records_exported_total": {undated_records_total},\n') out.write(f' "links_exported_total": {links_exported_total},\n') out.write(f' "entity_sets_scanned_total": {entity_sets_scanned_total},\n') out.write(' "records_per_entity_set": ') out.write(json.dumps(records_per_entity_set, ensure_ascii=False, indent=2)) out.write(",\n") out.write(' "truncated_entity_sets": ') out.write(json.dumps(truncated_entity_sets, ensure_ascii=False)) out.write(",\n") out.write(' "items": [\n') line_count = 0 with ndjson_path.open("r", encoding="utf-8") as source: for raw_line in source: line = raw_line.strip() if not line: continue if line_count > 0: out.write(",\n") out.write(" ") out.write(line) line_count += 1 out.write("\n ]\n") out.write("}\n") def detect_record_datetime(record: dict[str, Any]) -> datetime | None: parsed = parse_record_datetime(record) if parsed is not None: return normalize_dt(parsed) for field, value in record.items(): if value is None: continue field_name = str(field).strip().lower() if not any(token in field_name for token in DATE_FIELD_HINTS): continue parsed_fallback = parse_dt(str(value)) if parsed_fallback is not None: return normalize_dt(parsed_fallback) return None def row_signature(row: dict[str, Any]) -> str: source_id = ( row.get("Ref_Key") or row.get("Ref") or row.get("ID") or row.get("Id") or row.get("id") or "" ) if source_id: return str(source_id) raw = json.dumps(row, ensure_ascii=False, sort_keys=True, separators=(",", ":")) return hashlib.sha1(raw.encode("utf-8")).hexdigest() def months_for_record(*, year: int, dt: datetime | None, include_undated: str) -> list[int]: if dt is None: if include_undated == "all_months": return list(range(1, 13)) return [] if dt.year > year: return [] if dt.year < year: return list(range(1, 13)) return list(range(dt.month, 13)) def main() -> int: args = parse_args() settings = load_settings() client = ODataClient(settings) output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) tmp_dir = output_dir / "_tmp_ndjson" tmp_dir.mkdir(parents=True, exist_ok=True) entity_sets = resolve_entity_sets( client=client, explicit_sets=args.entity_set, keywords=args.keyword, ) if not entity_sets: raise RuntimeError("No entity sets resolved for export") month_streams = open_month_streams(tmp_dir, args.year) month_records_per_set: dict[int, dict[str, int]] = {m: defaultdict(int) for m in range(1, 13)} month_records_total: dict[int, int] = {m: 0 for m in range(1, 13)} month_links_total: dict[int, int] = {m: 0 for m in range(1, 13)} month_dated_records_total: dict[int, int] = {m: 0 for m in range(1, 13)} month_undated_records_total: dict[int, int] = {m: 0 for m in range(1, 13)} entity_set_stats: dict[str, dict[str, Any]] = {} truncated_entity_sets: set[str] = set() started = time.time() written_records_total = 0 try: for index, entity_set in enumerate(entity_sets, start=1): pages_read = 0 records_scanned = 0 records_mapped = 0 records_with_datetime = 0 records_without_datetime = 0 records_skipped_future = 0 mapping_errors = 0 pagination_repeat_guard_hits = 0 truncated = False prev_first_signature: str | None = None page_idx = 0 while True: if args.max_pages_per_set > 0 and page_idx >= args.max_pages_per_set: truncated = True truncated_entity_sets.add(entity_set) break skip = page_idx * args.page_size batch = client.read_entity_set_records( entity_set, top=args.page_size, extra_params={"$skip": skip}, ) pages_read += 1 page_idx += 1 if not batch: break if args.progress_pages_every > 0 and (page_idx % args.progress_pages_every == 0): elapsed = round(time.time() - started, 2) print( json.dumps( { "page_progress": { "entity_set": entity_set, "entity_set_index": index, "entity_sets_total": len(entity_sets), "page": page_idx, "records_scanned_in_set": records_scanned, "elapsed_seconds": elapsed, } }, ensure_ascii=False, ) ) current_first_signature = row_signature(batch[0]) if ( prev_first_signature is not None and current_first_signature == prev_first_signature and len(batch) == args.page_size ): # Defensive guard against broken pagination that repeats the same page forever. pagination_repeat_guard_hits += 1 truncated = True truncated_entity_sets.add(entity_set) break prev_first_signature = current_first_signature records_scanned += len(batch) for row in batch: dt = detect_record_datetime(row) if dt is None: records_without_datetime += 1 else: records_with_datetime += 1 if dt.year > args.year: records_skipped_future += 1 continue try: mapped = map_record(entity_set, row) except Exception: mapping_errors += 1 continue payload = mapped.model_dump() encoded = json.dumps(payload, ensure_ascii=False) months = months_for_record(year=args.year, dt=dt, include_undated=args.include_undated) for month in months: month_streams[month].write(encoded) month_streams[month].write("\n") month_records_total[month] += 1 month_links_total[month] += len(mapped.links) month_records_per_set[month][entity_set] += 1 if dt is None: month_undated_records_total[month] += 1 else: month_dated_records_total[month] += 1 records_mapped += 1 written_records_total += len(months) if args.flush_every_records > 0 and (written_records_total % args.flush_every_records == 0): for stream in month_streams.values(): stream.flush() if len(batch) < args.page_size: break entity_set_stats[entity_set] = { "pages_read": pages_read, "records_scanned": records_scanned, "records_mapped": records_mapped, "records_with_datetime": records_with_datetime, "records_without_datetime": records_without_datetime, "records_skipped_future": records_skipped_future, "mapping_errors": mapping_errors, "truncated_by_limit_or_guard": truncated, "pagination_repeat_guard_hits": pagination_repeat_guard_hits, } if args.progress_every > 0 and index % args.progress_every == 0: elapsed = round(time.time() - started, 2) print( json.dumps( { "progress": { "processed_entity_sets": index, "entity_sets_total": len(entity_sets), "elapsed_seconds": elapsed, "records_exported_total_so_far": sum(month_records_total.values()), } }, ensure_ascii=False, ) ) finally: close_month_streams(month_streams) snapshot_files: list[str] = [] for month in range(1, 13): ndjson_path = tmp_dir / f"snapshot_{args.year}-{month:02d}.ndjson" snapshot_path = output_dir / f"snapshot_{args.year}-{month:02d}_asof_full.json" write_month_snapshot_json( output_path=snapshot_path, ndjson_path=ndjson_path, year=args.year, month=month, records_exported_total=month_records_total[month], links_exported_total=month_links_total[month], records_per_entity_set=dict(sorted(month_records_per_set[month].items())), truncated_entity_sets=sorted(truncated_entity_sets), entity_sets_scanned_total=len(entity_sets), dated_records_total=month_dated_records_total[month], undated_records_total=month_undated_records_total[month], ) snapshot_files.append(snapshot_path.name) manifest = { "status": "success", "snapshot_mode": "cumulative_asof", "year": args.year, "service_root": settings.service_root, "entity_sets_total": len(entity_sets), "entity_sets": entity_sets, "page_size": args.page_size, "max_pages_per_set": args.max_pages_per_set, "include_undated": args.include_undated, "truncated_entity_sets_count": len(truncated_entity_sets), "truncated_entity_sets": sorted(truncated_entity_sets), "records_exported_per_month": { f"{args.year}-{month:02d}": month_records_total[month] for month in range(1, 13) }, "dated_records_exported_per_month": { f"{args.year}-{month:02d}": month_dated_records_total[month] for month in range(1, 13) }, "undated_records_exported_per_month": { f"{args.year}-{month:02d}": month_undated_records_total[month] for month in range(1, 13) }, "links_exported_per_month": { f"{args.year}-{month:02d}": month_links_total[month] for month in range(1, 13) }, "entity_set_stats": entity_set_stats, "snapshot_files": snapshot_files, "output_dir": str(output_dir), "generated_at_utc": datetime.now(timezone.utc).isoformat(), } manifest_path = output_dir / f"manifest_{args.year}_monthly_company_asof_full.json" manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8") summary = { "status": "success", "snapshot_mode": "cumulative_asof", "year": args.year, "output_dir": str(output_dir), "manifest_path": str(manifest_path), "snapshot_files_count": len(snapshot_files), "records_exported_total_across_month_files": sum(month_records_total.values()), "last_month_records_total": month_records_total[12], "truncated_entity_sets_count": len(truncated_entity_sets), } print(json.dumps(summary, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())