NODEDC_1C/scripts/export_arch_monthly_company...

472 lines
17 KiB
Python

from __future__ import annotations
import argparse
import hashlib
import json
import time
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
import sys
from typing import Any, TextIO
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from canonical_layer.mappers import map_record
from canonical_layer.period_snapshot import normalize_dt, parse_dt, parse_record_datetime
from config.client import ODataClient, extract_entity_sets
from config.settings import load_settings
DATE_FIELD_HINTS = (
"date",
"period",
"posted",
"datetime",
"timestamp",
"дата",
"период",
"проведен",
"момент",
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Export full cumulative monthly snapshots for company data",
)
parser.add_argument("--year", type=int, default=2020, help="Target year")
parser.add_argument(
"--output-dir",
default=str(PROJECT_ROOT / "docs" / "ARCH" / "2020_monthly_company_asof_full"),
help="Directory where monthly snapshots will be written",
)
parser.add_argument("--page-size", type=int, default=500, help="OData $top page size")
parser.add_argument(
"--max-pages-per-set",
type=int,
default=0,
help="Max pages per entity set, 0 = unlimited",
)
parser.add_argument(
"--entity-set",
action="append",
default=None,
help="Explicit entity set to include (repeat for multiple)",
)
parser.add_argument(
"--keyword",
action="append",
default=None,
help="Filter entity sets by keyword (repeat for multiple)",
)
parser.add_argument(
"--include-undated",
choices=("all_months", "skip"),
default="all_months",
help="How to treat records without detected date",
)
parser.add_argument(
"--progress-every",
type=int,
default=10,
help="Print progress every N entity sets",
)
parser.add_argument(
"--progress-pages-every",
type=int,
default=50,
help="Print progress every N pages inside current entity set (0 to disable)",
)
parser.add_argument(
"--flush-every-records",
type=int,
default=5000,
help="Flush NDJSON streams every N written records (0 to disable)",
)
return parser.parse_args()
def month_bounds(year: int, month: int) -> tuple[datetime, datetime]:
start = datetime(year, month, 1, 0, 0, 0, tzinfo=timezone.utc)
if month == 12:
end = datetime(year + 1, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
else:
end = datetime(year, month + 1, 1, 0, 0, 0, tzinfo=timezone.utc)
return start, end
def resolve_entity_sets(
*,
client: ODataClient,
explicit_sets: list[str] | None,
keywords: list[str] | None,
) -> list[str]:
if explicit_sets:
unique = sorted({item.strip() for item in explicit_sets if item and item.strip()})
return unique
metadata_xml = client.fetch_metadata()
all_sets = [item["name"] for item in extract_entity_sets(metadata_xml)]
if not keywords:
return sorted(set(all_sets))
normalized_keywords = [k.strip().lower() for k in keywords if k.strip()]
if not normalized_keywords:
return sorted(set(all_sets))
filtered = [
entity_set
for entity_set in all_sets
if any(token in entity_set.lower() for token in normalized_keywords)
]
return sorted(set(filtered))
def open_month_streams(tmp_dir: Path, year: int) -> dict[int, TextIO]:
streams: dict[int, TextIO] = {}
for month in range(1, 13):
ndjson_path = tmp_dir / f"snapshot_{year}-{month:02d}.ndjson"
streams[month] = ndjson_path.open("w", encoding="utf-8")
return streams
def close_month_streams(streams: dict[int, TextIO]) -> None:
for stream in streams.values():
stream.close()
def write_month_snapshot_json(
*,
output_path: Path,
ndjson_path: Path,
year: int,
month: int,
records_exported_total: int,
links_exported_total: int,
records_per_entity_set: dict[str, int],
truncated_entity_sets: list[str],
entity_sets_scanned_total: int,
dated_records_total: int,
undated_records_total: int,
) -> None:
start, end = month_bounds(year, month)
with output_path.open("w", encoding="utf-8") as out:
out.write("{\n")
out.write(' "status": "success",\n')
out.write(' "snapshot_mode": "cumulative_asof",\n')
out.write(f' "year": {year},\n')
out.write(f' "month": "{month:02d}",\n')
out.write(f' "selected_window_key": "{year}-{month:02d}",\n')
out.write(f' "selected_window_start": "{start.isoformat()}",\n')
out.write(f' "selected_window_end_exclusive": "{end.isoformat()}",\n')
out.write(f' "records_exported_total": {records_exported_total},\n')
out.write(f' "dated_records_exported_total": {dated_records_total},\n')
out.write(f' "undated_records_exported_total": {undated_records_total},\n')
out.write(f' "links_exported_total": {links_exported_total},\n')
out.write(f' "entity_sets_scanned_total": {entity_sets_scanned_total},\n')
out.write(' "records_per_entity_set": ')
out.write(json.dumps(records_per_entity_set, ensure_ascii=False, indent=2))
out.write(",\n")
out.write(' "truncated_entity_sets": ')
out.write(json.dumps(truncated_entity_sets, ensure_ascii=False))
out.write(",\n")
out.write(' "items": [\n')
line_count = 0
with ndjson_path.open("r", encoding="utf-8") as source:
for raw_line in source:
line = raw_line.strip()
if not line:
continue
if line_count > 0:
out.write(",\n")
out.write(" ")
out.write(line)
line_count += 1
out.write("\n ]\n")
out.write("}\n")
def detect_record_datetime(record: dict[str, Any]) -> datetime | None:
parsed = parse_record_datetime(record)
if parsed is not None:
return normalize_dt(parsed)
for field, value in record.items():
if value is None:
continue
field_name = str(field).strip().lower()
if not any(token in field_name for token in DATE_FIELD_HINTS):
continue
parsed_fallback = parse_dt(str(value))
if parsed_fallback is not None:
return normalize_dt(parsed_fallback)
return None
def row_signature(row: dict[str, Any]) -> str:
source_id = (
row.get("Ref_Key")
or row.get("Ref")
or row.get("ID")
or row.get("Id")
or row.get("id")
or ""
)
if source_id:
return str(source_id)
raw = json.dumps(row, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
return hashlib.sha1(raw.encode("utf-8")).hexdigest()
def months_for_record(*, year: int, dt: datetime | None, include_undated: str) -> list[int]:
if dt is None:
if include_undated == "all_months":
return list(range(1, 13))
return []
if dt.year > year:
return []
if dt.year < year:
return list(range(1, 13))
return list(range(dt.month, 13))
def main() -> int:
args = parse_args()
settings = load_settings()
client = ODataClient(settings)
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
tmp_dir = output_dir / "_tmp_ndjson"
tmp_dir.mkdir(parents=True, exist_ok=True)
entity_sets = resolve_entity_sets(
client=client,
explicit_sets=args.entity_set,
keywords=args.keyword,
)
if not entity_sets:
raise RuntimeError("No entity sets resolved for export")
month_streams = open_month_streams(tmp_dir, args.year)
month_records_per_set: dict[int, dict[str, int]] = {m: defaultdict(int) for m in range(1, 13)}
month_records_total: dict[int, int] = {m: 0 for m in range(1, 13)}
month_links_total: dict[int, int] = {m: 0 for m in range(1, 13)}
month_dated_records_total: dict[int, int] = {m: 0 for m in range(1, 13)}
month_undated_records_total: dict[int, int] = {m: 0 for m in range(1, 13)}
entity_set_stats: dict[str, dict[str, Any]] = {}
truncated_entity_sets: set[str] = set()
started = time.time()
written_records_total = 0
try:
for index, entity_set in enumerate(entity_sets, start=1):
pages_read = 0
records_scanned = 0
records_mapped = 0
records_with_datetime = 0
records_without_datetime = 0
records_skipped_future = 0
mapping_errors = 0
pagination_repeat_guard_hits = 0
truncated = False
prev_first_signature: str | None = None
page_idx = 0
while True:
if args.max_pages_per_set > 0 and page_idx >= args.max_pages_per_set:
truncated = True
truncated_entity_sets.add(entity_set)
break
skip = page_idx * args.page_size
batch = client.read_entity_set_records(
entity_set,
top=args.page_size,
extra_params={"$skip": skip},
)
pages_read += 1
page_idx += 1
if not batch:
break
if args.progress_pages_every > 0 and (page_idx % args.progress_pages_every == 0):
elapsed = round(time.time() - started, 2)
print(
json.dumps(
{
"page_progress": {
"entity_set": entity_set,
"entity_set_index": index,
"entity_sets_total": len(entity_sets),
"page": page_idx,
"records_scanned_in_set": records_scanned,
"elapsed_seconds": elapsed,
}
},
ensure_ascii=False,
)
)
current_first_signature = row_signature(batch[0])
if (
prev_first_signature is not None
and current_first_signature == prev_first_signature
and len(batch) == args.page_size
):
# Defensive guard against broken pagination that repeats the same page forever.
pagination_repeat_guard_hits += 1
truncated = True
truncated_entity_sets.add(entity_set)
break
prev_first_signature = current_first_signature
records_scanned += len(batch)
for row in batch:
dt = detect_record_datetime(row)
if dt is None:
records_without_datetime += 1
else:
records_with_datetime += 1
if dt.year > args.year:
records_skipped_future += 1
continue
try:
mapped = map_record(entity_set, row)
except Exception:
mapping_errors += 1
continue
payload = mapped.model_dump()
encoded = json.dumps(payload, ensure_ascii=False)
months = months_for_record(year=args.year, dt=dt, include_undated=args.include_undated)
for month in months:
month_streams[month].write(encoded)
month_streams[month].write("\n")
month_records_total[month] += 1
month_links_total[month] += len(mapped.links)
month_records_per_set[month][entity_set] += 1
if dt is None:
month_undated_records_total[month] += 1
else:
month_dated_records_total[month] += 1
records_mapped += 1
written_records_total += len(months)
if args.flush_every_records > 0 and (written_records_total % args.flush_every_records == 0):
for stream in month_streams.values():
stream.flush()
if len(batch) < args.page_size:
break
entity_set_stats[entity_set] = {
"pages_read": pages_read,
"records_scanned": records_scanned,
"records_mapped": records_mapped,
"records_with_datetime": records_with_datetime,
"records_without_datetime": records_without_datetime,
"records_skipped_future": records_skipped_future,
"mapping_errors": mapping_errors,
"truncated_by_limit_or_guard": truncated,
"pagination_repeat_guard_hits": pagination_repeat_guard_hits,
}
if args.progress_every > 0 and index % args.progress_every == 0:
elapsed = round(time.time() - started, 2)
print(
json.dumps(
{
"progress": {
"processed_entity_sets": index,
"entity_sets_total": len(entity_sets),
"elapsed_seconds": elapsed,
"records_exported_total_so_far": sum(month_records_total.values()),
}
},
ensure_ascii=False,
)
)
finally:
close_month_streams(month_streams)
snapshot_files: list[str] = []
for month in range(1, 13):
ndjson_path = tmp_dir / f"snapshot_{args.year}-{month:02d}.ndjson"
snapshot_path = output_dir / f"snapshot_{args.year}-{month:02d}_asof_full.json"
write_month_snapshot_json(
output_path=snapshot_path,
ndjson_path=ndjson_path,
year=args.year,
month=month,
records_exported_total=month_records_total[month],
links_exported_total=month_links_total[month],
records_per_entity_set=dict(sorted(month_records_per_set[month].items())),
truncated_entity_sets=sorted(truncated_entity_sets),
entity_sets_scanned_total=len(entity_sets),
dated_records_total=month_dated_records_total[month],
undated_records_total=month_undated_records_total[month],
)
snapshot_files.append(snapshot_path.name)
manifest = {
"status": "success",
"snapshot_mode": "cumulative_asof",
"year": args.year,
"service_root": settings.service_root,
"entity_sets_total": len(entity_sets),
"entity_sets": entity_sets,
"page_size": args.page_size,
"max_pages_per_set": args.max_pages_per_set,
"include_undated": args.include_undated,
"truncated_entity_sets_count": len(truncated_entity_sets),
"truncated_entity_sets": sorted(truncated_entity_sets),
"records_exported_per_month": {
f"{args.year}-{month:02d}": month_records_total[month] for month in range(1, 13)
},
"dated_records_exported_per_month": {
f"{args.year}-{month:02d}": month_dated_records_total[month] for month in range(1, 13)
},
"undated_records_exported_per_month": {
f"{args.year}-{month:02d}": month_undated_records_total[month] for month in range(1, 13)
},
"links_exported_per_month": {
f"{args.year}-{month:02d}": month_links_total[month] for month in range(1, 13)
},
"entity_set_stats": entity_set_stats,
"snapshot_files": snapshot_files,
"output_dir": str(output_dir),
"generated_at_utc": datetime.now(timezone.utc).isoformat(),
}
manifest_path = output_dir / f"manifest_{args.year}_monthly_company_asof_full.json"
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")
summary = {
"status": "success",
"snapshot_mode": "cumulative_asof",
"year": args.year,
"output_dir": str(output_dir),
"manifest_path": str(manifest_path),
"snapshot_files_count": len(snapshot_files),
"records_exported_total_across_month_files": sum(month_records_total.values()),
"last_month_records_total": month_records_total[12],
"truncated_entity_sets_count": len(truncated_entity_sets),
}
print(json.dumps(summary, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())