319 lines
11 KiB
Python
319 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import time
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
import sys
|
|
from typing import Any, TextIO
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
if str(PROJECT_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
from canonical_layer.mappers import map_record
|
|
from canonical_layer.period_snapshot import normalize_dt, parse_record_datetime
|
|
from config.client import ODataClient, extract_entity_sets
|
|
from config.settings import load_settings
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Export full monthly snapshots for a year into docs/ARCH",
|
|
)
|
|
parser.add_argument("--year", type=int, default=2020, help="Target year")
|
|
parser.add_argument(
|
|
"--output-dir",
|
|
default=str(PROJECT_ROOT / "docs" / "ARCH" / "2020_monthly_full_snapshots"),
|
|
help="Directory where monthly snapshots will be written",
|
|
)
|
|
parser.add_argument("--page-size", type=int, default=500, help="OData $top page size")
|
|
parser.add_argument(
|
|
"--max-pages-per-set",
|
|
type=int,
|
|
default=200,
|
|
help="Safety cap for pages per entity set (set higher for deeper export)",
|
|
)
|
|
parser.add_argument(
|
|
"--entity-set",
|
|
action="append",
|
|
default=None,
|
|
help="Explicit entity set to include (repeat for multiple)",
|
|
)
|
|
parser.add_argument(
|
|
"--keyword",
|
|
action="append",
|
|
default=None,
|
|
help="Filter entity sets by keyword (repeat for multiple)",
|
|
)
|
|
parser.add_argument(
|
|
"--progress-every",
|
|
type=int,
|
|
default=25,
|
|
help="Print progress every N entity sets",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def month_bounds(year: int, month: int) -> tuple[datetime, datetime]:
|
|
start = datetime(year, month, 1, 0, 0, 0, tzinfo=timezone.utc)
|
|
if month == 12:
|
|
end = datetime(year + 1, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
|
|
else:
|
|
end = datetime(year, month + 1, 1, 0, 0, 0, tzinfo=timezone.utc)
|
|
return start, end
|
|
|
|
|
|
def resolve_entity_sets(
|
|
*,
|
|
client: ODataClient,
|
|
explicit_sets: list[str] | None,
|
|
keywords: list[str] | None,
|
|
) -> list[str]:
|
|
if explicit_sets:
|
|
unique = sorted({item.strip() for item in explicit_sets if item and item.strip()})
|
|
return unique
|
|
|
|
metadata_xml = client.fetch_metadata()
|
|
all_sets = [item["name"] for item in extract_entity_sets(metadata_xml)]
|
|
if not keywords:
|
|
return sorted(set(all_sets))
|
|
|
|
normalized_keywords = [k.strip().lower() for k in keywords if k.strip()]
|
|
if not normalized_keywords:
|
|
return sorted(set(all_sets))
|
|
|
|
filtered = [
|
|
entity_set
|
|
for entity_set in all_sets
|
|
if any(token in entity_set.lower() for token in normalized_keywords)
|
|
]
|
|
return sorted(set(filtered))
|
|
|
|
|
|
def open_month_streams(tmp_dir: Path, year: int) -> dict[int, TextIO]:
|
|
streams: dict[int, TextIO] = {}
|
|
for month in range(1, 13):
|
|
ndjson_path = tmp_dir / f"snapshot_{year}-{month:02d}.ndjson"
|
|
streams[month] = ndjson_path.open("w", encoding="utf-8")
|
|
return streams
|
|
|
|
|
|
def close_month_streams(streams: dict[int, TextIO]) -> None:
|
|
for stream in streams.values():
|
|
stream.close()
|
|
|
|
|
|
def write_month_snapshot_json(
|
|
*,
|
|
output_path: Path,
|
|
ndjson_path: Path,
|
|
year: int,
|
|
month: int,
|
|
records_exported_total: int,
|
|
links_exported_total: int,
|
|
records_per_entity_set: dict[str, int],
|
|
truncated_entity_sets: list[str],
|
|
entity_sets_scanned_total: int,
|
|
) -> None:
|
|
start, end = month_bounds(year, month)
|
|
with output_path.open("w", encoding="utf-8") as out:
|
|
out.write("{\n")
|
|
out.write(' "status": "success",\n')
|
|
out.write(f' "year": {year},\n')
|
|
out.write(f' "month": "{month:02d}",\n')
|
|
out.write(f' "selected_window_key": "{year}-{month:02d}",\n')
|
|
out.write(f' "selected_window_start": "{start.isoformat()}",\n')
|
|
out.write(f' "selected_window_end_exclusive": "{end.isoformat()}",\n')
|
|
out.write(f' "records_exported_total": {records_exported_total},\n')
|
|
out.write(f' "links_exported_total": {links_exported_total},\n')
|
|
out.write(f' "entity_sets_scanned_total": {entity_sets_scanned_total},\n')
|
|
out.write(' "records_per_entity_set": ')
|
|
out.write(json.dumps(records_per_entity_set, ensure_ascii=False, indent=2))
|
|
out.write(",\n")
|
|
out.write(' "truncated_entity_sets": ')
|
|
out.write(json.dumps(truncated_entity_sets, ensure_ascii=False))
|
|
out.write(",\n")
|
|
out.write(' "items": [\n')
|
|
|
|
line_count = 0
|
|
with ndjson_path.open("r", encoding="utf-8") as source:
|
|
for raw_line in source:
|
|
line = raw_line.strip()
|
|
if not line:
|
|
continue
|
|
if line_count > 0:
|
|
out.write(",\n")
|
|
out.write(" ")
|
|
out.write(line)
|
|
line_count += 1
|
|
|
|
out.write("\n ]\n")
|
|
out.write("}\n")
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
settings = load_settings()
|
|
client = ODataClient(settings)
|
|
|
|
output_dir = Path(args.output_dir)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
tmp_dir = output_dir / "_tmp_ndjson"
|
|
tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
entity_sets = resolve_entity_sets(
|
|
client=client,
|
|
explicit_sets=args.entity_set,
|
|
keywords=args.keyword,
|
|
)
|
|
if not entity_sets:
|
|
raise RuntimeError("No entity sets resolved for export")
|
|
|
|
month_streams = open_month_streams(tmp_dir, args.year)
|
|
month_records_per_set: dict[int, dict[str, int]] = {m: defaultdict(int) for m in range(1, 13)}
|
|
month_records_total: dict[int, int] = {m: 0 for m in range(1, 13)}
|
|
month_links_total: dict[int, int] = {m: 0 for m in range(1, 13)}
|
|
|
|
entity_set_stats: dict[str, dict[str, Any]] = {}
|
|
truncated_entity_sets: set[str] = set()
|
|
started = time.time()
|
|
|
|
try:
|
|
for index, entity_set in enumerate(entity_sets, start=1):
|
|
pages_read = 0
|
|
records_scanned = 0
|
|
records_without_datetime = 0
|
|
records_in_year = 0
|
|
mapping_errors = 0
|
|
truncated = False
|
|
|
|
for page_idx in range(args.max_pages_per_set):
|
|
skip = page_idx * args.page_size
|
|
batch = client.read_entity_set_records(
|
|
entity_set,
|
|
top=args.page_size,
|
|
extra_params={"$skip": skip},
|
|
)
|
|
pages_read += 1
|
|
|
|
if not batch:
|
|
break
|
|
|
|
records_scanned += len(batch)
|
|
for row in batch:
|
|
parsed = parse_record_datetime(row)
|
|
if parsed is None:
|
|
records_without_datetime += 1
|
|
continue
|
|
|
|
dt = normalize_dt(parsed)
|
|
if dt.year != args.year:
|
|
continue
|
|
|
|
records_in_year += 1
|
|
month = dt.month
|
|
try:
|
|
mapped = map_record(entity_set, row)
|
|
except Exception:
|
|
mapping_errors += 1
|
|
continue
|
|
|
|
payload = mapped.model_dump()
|
|
month_streams[month].write(json.dumps(payload, ensure_ascii=False))
|
|
month_streams[month].write("\n")
|
|
month_records_total[month] += 1
|
|
month_links_total[month] += len(mapped.links)
|
|
month_records_per_set[month][entity_set] += 1
|
|
|
|
if len(batch) < args.page_size:
|
|
break
|
|
else:
|
|
truncated = True
|
|
truncated_entity_sets.add(entity_set)
|
|
|
|
entity_set_stats[entity_set] = {
|
|
"pages_read": pages_read,
|
|
"records_scanned": records_scanned,
|
|
"records_without_datetime": records_without_datetime,
|
|
"records_in_year": records_in_year,
|
|
"mapping_errors": mapping_errors,
|
|
"truncated_by_max_pages": truncated,
|
|
}
|
|
|
|
if args.progress_every > 0 and index % args.progress_every == 0:
|
|
elapsed = round(time.time() - started, 2)
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"progress": {
|
|
"processed_entity_sets": index,
|
|
"entity_sets_total": len(entity_sets),
|
|
"elapsed_seconds": elapsed,
|
|
}
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
)
|
|
finally:
|
|
close_month_streams(month_streams)
|
|
|
|
snapshot_files: list[str] = []
|
|
for month in range(1, 13):
|
|
ndjson_path = tmp_dir / f"snapshot_{args.year}-{month:02d}.ndjson"
|
|
snapshot_path = output_dir / f"snapshot_{args.year}-{month:02d}_full.json"
|
|
write_month_snapshot_json(
|
|
output_path=snapshot_path,
|
|
ndjson_path=ndjson_path,
|
|
year=args.year,
|
|
month=month,
|
|
records_exported_total=month_records_total[month],
|
|
links_exported_total=month_links_total[month],
|
|
records_per_entity_set=dict(sorted(month_records_per_set[month].items())),
|
|
truncated_entity_sets=sorted(truncated_entity_sets),
|
|
entity_sets_scanned_total=len(entity_sets),
|
|
)
|
|
snapshot_files.append(snapshot_path.name)
|
|
|
|
manifest = {
|
|
"status": "success",
|
|
"year": args.year,
|
|
"service_root": settings.service_root,
|
|
"entity_sets_total": len(entity_sets),
|
|
"entity_sets": entity_sets,
|
|
"page_size": args.page_size,
|
|
"max_pages_per_set": args.max_pages_per_set,
|
|
"truncated_entity_sets_count": len(truncated_entity_sets),
|
|
"truncated_entity_sets": sorted(truncated_entity_sets),
|
|
"records_exported_per_month": {
|
|
f"{args.year}-{month:02d}": month_records_total[month] for month in range(1, 13)
|
|
},
|
|
"links_exported_per_month": {
|
|
f"{args.year}-{month:02d}": month_links_total[month] for month in range(1, 13)
|
|
},
|
|
"entity_set_stats": entity_set_stats,
|
|
"snapshot_files": snapshot_files,
|
|
"output_dir": str(output_dir),
|
|
"generated_at_utc": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
manifest_path = output_dir / f"manifest_{args.year}_monthly_full.json"
|
|
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
summary = {
|
|
"status": "success",
|
|
"year": args.year,
|
|
"output_dir": str(output_dir),
|
|
"manifest_path": str(manifest_path),
|
|
"snapshot_files_count": len(snapshot_files),
|
|
"records_exported_total": sum(month_records_total.values()),
|
|
"truncated_entity_sets_count": len(truncated_entity_sets),
|
|
}
|
|
print(json.dumps(summary, ensure_ascii=False, indent=2))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|