472 lines
17 KiB
Python
472 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import time
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
import sys
|
|
from typing import Any, TextIO
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
if str(PROJECT_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
from canonical_layer.mappers import map_record
|
|
from canonical_layer.period_snapshot import normalize_dt, parse_dt, parse_record_datetime
|
|
from config.client import ODataClient, extract_entity_sets
|
|
from config.settings import load_settings
|
|
|
|
|
|
DATE_FIELD_HINTS = (
|
|
"date",
|
|
"period",
|
|
"posted",
|
|
"datetime",
|
|
"timestamp",
|
|
"дата",
|
|
"период",
|
|
"проведен",
|
|
"момент",
|
|
)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Export full cumulative monthly snapshots for company data",
|
|
)
|
|
parser.add_argument("--year", type=int, default=2020, help="Target year")
|
|
parser.add_argument(
|
|
"--output-dir",
|
|
default=str(PROJECT_ROOT / "docs" / "ARCH" / "2020_monthly_company_asof_full"),
|
|
help="Directory where monthly snapshots will be written",
|
|
)
|
|
parser.add_argument("--page-size", type=int, default=500, help="OData $top page size")
|
|
parser.add_argument(
|
|
"--max-pages-per-set",
|
|
type=int,
|
|
default=0,
|
|
help="Max pages per entity set, 0 = unlimited",
|
|
)
|
|
parser.add_argument(
|
|
"--entity-set",
|
|
action="append",
|
|
default=None,
|
|
help="Explicit entity set to include (repeat for multiple)",
|
|
)
|
|
parser.add_argument(
|
|
"--keyword",
|
|
action="append",
|
|
default=None,
|
|
help="Filter entity sets by keyword (repeat for multiple)",
|
|
)
|
|
parser.add_argument(
|
|
"--include-undated",
|
|
choices=("all_months", "skip"),
|
|
default="all_months",
|
|
help="How to treat records without detected date",
|
|
)
|
|
parser.add_argument(
|
|
"--progress-every",
|
|
type=int,
|
|
default=10,
|
|
help="Print progress every N entity sets",
|
|
)
|
|
parser.add_argument(
|
|
"--progress-pages-every",
|
|
type=int,
|
|
default=50,
|
|
help="Print progress every N pages inside current entity set (0 to disable)",
|
|
)
|
|
parser.add_argument(
|
|
"--flush-every-records",
|
|
type=int,
|
|
default=5000,
|
|
help="Flush NDJSON streams every N written records (0 to disable)",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def month_bounds(year: int, month: int) -> tuple[datetime, datetime]:
|
|
start = datetime(year, month, 1, 0, 0, 0, tzinfo=timezone.utc)
|
|
if month == 12:
|
|
end = datetime(year + 1, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
|
|
else:
|
|
end = datetime(year, month + 1, 1, 0, 0, 0, tzinfo=timezone.utc)
|
|
return start, end
|
|
|
|
|
|
def resolve_entity_sets(
|
|
*,
|
|
client: ODataClient,
|
|
explicit_sets: list[str] | None,
|
|
keywords: list[str] | None,
|
|
) -> list[str]:
|
|
if explicit_sets:
|
|
unique = sorted({item.strip() for item in explicit_sets if item and item.strip()})
|
|
return unique
|
|
|
|
metadata_xml = client.fetch_metadata()
|
|
all_sets = [item["name"] for item in extract_entity_sets(metadata_xml)]
|
|
if not keywords:
|
|
return sorted(set(all_sets))
|
|
|
|
normalized_keywords = [k.strip().lower() for k in keywords if k.strip()]
|
|
if not normalized_keywords:
|
|
return sorted(set(all_sets))
|
|
|
|
filtered = [
|
|
entity_set
|
|
for entity_set in all_sets
|
|
if any(token in entity_set.lower() for token in normalized_keywords)
|
|
]
|
|
return sorted(set(filtered))
|
|
|
|
|
|
def open_month_streams(tmp_dir: Path, year: int) -> dict[int, TextIO]:
|
|
streams: dict[int, TextIO] = {}
|
|
for month in range(1, 13):
|
|
ndjson_path = tmp_dir / f"snapshot_{year}-{month:02d}.ndjson"
|
|
streams[month] = ndjson_path.open("w", encoding="utf-8")
|
|
return streams
|
|
|
|
|
|
def close_month_streams(streams: dict[int, TextIO]) -> None:
|
|
for stream in streams.values():
|
|
stream.close()
|
|
|
|
|
|
def write_month_snapshot_json(
|
|
*,
|
|
output_path: Path,
|
|
ndjson_path: Path,
|
|
year: int,
|
|
month: int,
|
|
records_exported_total: int,
|
|
links_exported_total: int,
|
|
records_per_entity_set: dict[str, int],
|
|
truncated_entity_sets: list[str],
|
|
entity_sets_scanned_total: int,
|
|
dated_records_total: int,
|
|
undated_records_total: int,
|
|
) -> None:
|
|
start, end = month_bounds(year, month)
|
|
with output_path.open("w", encoding="utf-8") as out:
|
|
out.write("{\n")
|
|
out.write(' "status": "success",\n')
|
|
out.write(' "snapshot_mode": "cumulative_asof",\n')
|
|
out.write(f' "year": {year},\n')
|
|
out.write(f' "month": "{month:02d}",\n')
|
|
out.write(f' "selected_window_key": "{year}-{month:02d}",\n')
|
|
out.write(f' "selected_window_start": "{start.isoformat()}",\n')
|
|
out.write(f' "selected_window_end_exclusive": "{end.isoformat()}",\n')
|
|
out.write(f' "records_exported_total": {records_exported_total},\n')
|
|
out.write(f' "dated_records_exported_total": {dated_records_total},\n')
|
|
out.write(f' "undated_records_exported_total": {undated_records_total},\n')
|
|
out.write(f' "links_exported_total": {links_exported_total},\n')
|
|
out.write(f' "entity_sets_scanned_total": {entity_sets_scanned_total},\n')
|
|
out.write(' "records_per_entity_set": ')
|
|
out.write(json.dumps(records_per_entity_set, ensure_ascii=False, indent=2))
|
|
out.write(",\n")
|
|
out.write(' "truncated_entity_sets": ')
|
|
out.write(json.dumps(truncated_entity_sets, ensure_ascii=False))
|
|
out.write(",\n")
|
|
out.write(' "items": [\n')
|
|
|
|
line_count = 0
|
|
with ndjson_path.open("r", encoding="utf-8") as source:
|
|
for raw_line in source:
|
|
line = raw_line.strip()
|
|
if not line:
|
|
continue
|
|
if line_count > 0:
|
|
out.write(",\n")
|
|
out.write(" ")
|
|
out.write(line)
|
|
line_count += 1
|
|
|
|
out.write("\n ]\n")
|
|
out.write("}\n")
|
|
|
|
|
|
def detect_record_datetime(record: dict[str, Any]) -> datetime | None:
|
|
parsed = parse_record_datetime(record)
|
|
if parsed is not None:
|
|
return normalize_dt(parsed)
|
|
|
|
for field, value in record.items():
|
|
if value is None:
|
|
continue
|
|
field_name = str(field).strip().lower()
|
|
if not any(token in field_name for token in DATE_FIELD_HINTS):
|
|
continue
|
|
parsed_fallback = parse_dt(str(value))
|
|
if parsed_fallback is not None:
|
|
return normalize_dt(parsed_fallback)
|
|
return None
|
|
|
|
|
|
def row_signature(row: dict[str, Any]) -> str:
|
|
source_id = (
|
|
row.get("Ref_Key")
|
|
or row.get("Ref")
|
|
or row.get("ID")
|
|
or row.get("Id")
|
|
or row.get("id")
|
|
or ""
|
|
)
|
|
if source_id:
|
|
return str(source_id)
|
|
raw = json.dumps(row, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
|
|
return hashlib.sha1(raw.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def months_for_record(*, year: int, dt: datetime | None, include_undated: str) -> list[int]:
|
|
if dt is None:
|
|
if include_undated == "all_months":
|
|
return list(range(1, 13))
|
|
return []
|
|
|
|
if dt.year > year:
|
|
return []
|
|
if dt.year < year:
|
|
return list(range(1, 13))
|
|
return list(range(dt.month, 13))
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
settings = load_settings()
|
|
client = ODataClient(settings)
|
|
|
|
output_dir = Path(args.output_dir)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
tmp_dir = output_dir / "_tmp_ndjson"
|
|
tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
entity_sets = resolve_entity_sets(
|
|
client=client,
|
|
explicit_sets=args.entity_set,
|
|
keywords=args.keyword,
|
|
)
|
|
if not entity_sets:
|
|
raise RuntimeError("No entity sets resolved for export")
|
|
|
|
month_streams = open_month_streams(tmp_dir, args.year)
|
|
month_records_per_set: dict[int, dict[str, int]] = {m: defaultdict(int) for m in range(1, 13)}
|
|
month_records_total: dict[int, int] = {m: 0 for m in range(1, 13)}
|
|
month_links_total: dict[int, int] = {m: 0 for m in range(1, 13)}
|
|
month_dated_records_total: dict[int, int] = {m: 0 for m in range(1, 13)}
|
|
month_undated_records_total: dict[int, int] = {m: 0 for m in range(1, 13)}
|
|
|
|
entity_set_stats: dict[str, dict[str, Any]] = {}
|
|
truncated_entity_sets: set[str] = set()
|
|
started = time.time()
|
|
written_records_total = 0
|
|
|
|
try:
|
|
for index, entity_set in enumerate(entity_sets, start=1):
|
|
pages_read = 0
|
|
records_scanned = 0
|
|
records_mapped = 0
|
|
records_with_datetime = 0
|
|
records_without_datetime = 0
|
|
records_skipped_future = 0
|
|
mapping_errors = 0
|
|
pagination_repeat_guard_hits = 0
|
|
truncated = False
|
|
|
|
prev_first_signature: str | None = None
|
|
page_idx = 0
|
|
|
|
while True:
|
|
if args.max_pages_per_set > 0 and page_idx >= args.max_pages_per_set:
|
|
truncated = True
|
|
truncated_entity_sets.add(entity_set)
|
|
break
|
|
|
|
skip = page_idx * args.page_size
|
|
batch = client.read_entity_set_records(
|
|
entity_set,
|
|
top=args.page_size,
|
|
extra_params={"$skip": skip},
|
|
)
|
|
pages_read += 1
|
|
page_idx += 1
|
|
|
|
if not batch:
|
|
break
|
|
|
|
if args.progress_pages_every > 0 and (page_idx % args.progress_pages_every == 0):
|
|
elapsed = round(time.time() - started, 2)
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"page_progress": {
|
|
"entity_set": entity_set,
|
|
"entity_set_index": index,
|
|
"entity_sets_total": len(entity_sets),
|
|
"page": page_idx,
|
|
"records_scanned_in_set": records_scanned,
|
|
"elapsed_seconds": elapsed,
|
|
}
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
)
|
|
|
|
current_first_signature = row_signature(batch[0])
|
|
if (
|
|
prev_first_signature is not None
|
|
and current_first_signature == prev_first_signature
|
|
and len(batch) == args.page_size
|
|
):
|
|
# Defensive guard against broken pagination that repeats the same page forever.
|
|
pagination_repeat_guard_hits += 1
|
|
truncated = True
|
|
truncated_entity_sets.add(entity_set)
|
|
break
|
|
prev_first_signature = current_first_signature
|
|
|
|
records_scanned += len(batch)
|
|
for row in batch:
|
|
dt = detect_record_datetime(row)
|
|
if dt is None:
|
|
records_without_datetime += 1
|
|
else:
|
|
records_with_datetime += 1
|
|
if dt.year > args.year:
|
|
records_skipped_future += 1
|
|
continue
|
|
|
|
try:
|
|
mapped = map_record(entity_set, row)
|
|
except Exception:
|
|
mapping_errors += 1
|
|
continue
|
|
|
|
payload = mapped.model_dump()
|
|
encoded = json.dumps(payload, ensure_ascii=False)
|
|
months = months_for_record(year=args.year, dt=dt, include_undated=args.include_undated)
|
|
|
|
for month in months:
|
|
month_streams[month].write(encoded)
|
|
month_streams[month].write("\n")
|
|
month_records_total[month] += 1
|
|
month_links_total[month] += len(mapped.links)
|
|
month_records_per_set[month][entity_set] += 1
|
|
if dt is None:
|
|
month_undated_records_total[month] += 1
|
|
else:
|
|
month_dated_records_total[month] += 1
|
|
|
|
records_mapped += 1
|
|
written_records_total += len(months)
|
|
if args.flush_every_records > 0 and (written_records_total % args.flush_every_records == 0):
|
|
for stream in month_streams.values():
|
|
stream.flush()
|
|
|
|
if len(batch) < args.page_size:
|
|
break
|
|
|
|
entity_set_stats[entity_set] = {
|
|
"pages_read": pages_read,
|
|
"records_scanned": records_scanned,
|
|
"records_mapped": records_mapped,
|
|
"records_with_datetime": records_with_datetime,
|
|
"records_without_datetime": records_without_datetime,
|
|
"records_skipped_future": records_skipped_future,
|
|
"mapping_errors": mapping_errors,
|
|
"truncated_by_limit_or_guard": truncated,
|
|
"pagination_repeat_guard_hits": pagination_repeat_guard_hits,
|
|
}
|
|
|
|
if args.progress_every > 0 and index % args.progress_every == 0:
|
|
elapsed = round(time.time() - started, 2)
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"progress": {
|
|
"processed_entity_sets": index,
|
|
"entity_sets_total": len(entity_sets),
|
|
"elapsed_seconds": elapsed,
|
|
"records_exported_total_so_far": sum(month_records_total.values()),
|
|
}
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
)
|
|
finally:
|
|
close_month_streams(month_streams)
|
|
|
|
snapshot_files: list[str] = []
|
|
for month in range(1, 13):
|
|
ndjson_path = tmp_dir / f"snapshot_{args.year}-{month:02d}.ndjson"
|
|
snapshot_path = output_dir / f"snapshot_{args.year}-{month:02d}_asof_full.json"
|
|
write_month_snapshot_json(
|
|
output_path=snapshot_path,
|
|
ndjson_path=ndjson_path,
|
|
year=args.year,
|
|
month=month,
|
|
records_exported_total=month_records_total[month],
|
|
links_exported_total=month_links_total[month],
|
|
records_per_entity_set=dict(sorted(month_records_per_set[month].items())),
|
|
truncated_entity_sets=sorted(truncated_entity_sets),
|
|
entity_sets_scanned_total=len(entity_sets),
|
|
dated_records_total=month_dated_records_total[month],
|
|
undated_records_total=month_undated_records_total[month],
|
|
)
|
|
snapshot_files.append(snapshot_path.name)
|
|
|
|
manifest = {
|
|
"status": "success",
|
|
"snapshot_mode": "cumulative_asof",
|
|
"year": args.year,
|
|
"service_root": settings.service_root,
|
|
"entity_sets_total": len(entity_sets),
|
|
"entity_sets": entity_sets,
|
|
"page_size": args.page_size,
|
|
"max_pages_per_set": args.max_pages_per_set,
|
|
"include_undated": args.include_undated,
|
|
"truncated_entity_sets_count": len(truncated_entity_sets),
|
|
"truncated_entity_sets": sorted(truncated_entity_sets),
|
|
"records_exported_per_month": {
|
|
f"{args.year}-{month:02d}": month_records_total[month] for month in range(1, 13)
|
|
},
|
|
"dated_records_exported_per_month": {
|
|
f"{args.year}-{month:02d}": month_dated_records_total[month] for month in range(1, 13)
|
|
},
|
|
"undated_records_exported_per_month": {
|
|
f"{args.year}-{month:02d}": month_undated_records_total[month] for month in range(1, 13)
|
|
},
|
|
"links_exported_per_month": {
|
|
f"{args.year}-{month:02d}": month_links_total[month] for month in range(1, 13)
|
|
},
|
|
"entity_set_stats": entity_set_stats,
|
|
"snapshot_files": snapshot_files,
|
|
"output_dir": str(output_dir),
|
|
"generated_at_utc": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
manifest_path = output_dir / f"manifest_{args.year}_monthly_company_asof_full.json"
|
|
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
summary = {
|
|
"status": "success",
|
|
"snapshot_mode": "cumulative_asof",
|
|
"year": args.year,
|
|
"output_dir": str(output_dir),
|
|
"manifest_path": str(manifest_path),
|
|
"snapshot_files_count": len(snapshot_files),
|
|
"records_exported_total_across_month_files": sum(month_records_total.values()),
|
|
"last_month_records_total": month_records_total[12],
|
|
"truncated_entity_sets_count": len(truncated_entity_sets),
|
|
}
|
|
print(json.dumps(summary, ensure_ascii=False, indent=2))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|