from __future__ import annotations import argparse from collections import defaultdict from datetime import datetime, timezone import json from pathlib import Path import sys from typing import Any PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from canonical_layer.mappers import map_record from canonical_layer.period_snapshot import normalize_dt, parse_dt, parse_record_datetime, window_bounds_from_key, window_key from config.client import ODataClient, extract_entity_sets from config.settings import LOGS_DIR, load_settings DEFAULT_KEYWORDS = [ "accountingregister", "accumulationregister", "document", "регистр", "документ", "хозрасчет", ] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Find most active period before reporting deadline and export dense snapshot", ) parser.add_argument("--year", type=int, default=2020, help="Target reporting year") parser.add_argument( "--reporting-deadline", default=None, help="Inclusive scan upper bound in ISO format (default: March 31 next year 23:59:59 UTC)", ) parser.add_argument( "--granularity", choices=("month", "week"), default="month", help="Activity window granularity", ) parser.add_argument("--page-size", type=int, default=500, help="OData page size ($top)") parser.add_argument("--max-pages-per-set", type=int, default=200, help="Max pages to read per entity set") parser.add_argument( "--snapshot-max-records-per-set", type=int, default=5000, help="Maximum exported records per entity set in selected period", ) parser.add_argument( "--keyword", action="append", default=None, help="Entity-set keyword matcher (repeat flag for multiple values)", ) parser.add_argument( "--entity-set", action="append", default=None, help="Explicit entity set list (repeat flag for multiple values)", ) parser.add_argument( "--profile-output", default=None, help="Path to activity profile JSON", ) parser.add_argument( "--snapshot-output", default=None, help="Path to exported selected-period snapshot JSON", ) parser.add_argument("--strict", action="store_true", help="Fail when no active period is found") return parser.parse_args() def parse_deadline(year: int, raw: str | None) -> datetime: if raw: parsed = parse_dt(raw) if parsed is None: raise ValueError(f"Unable to parse --reporting-deadline={raw}") return parsed return datetime(year + 1, 3, 31, 23, 59, 59, tzinfo=timezone.utc) def resolve_entity_sets( *, client: ODataClient, explicit_sets: list[str] | None, keywords: list[str] | None, ) -> list[str]: if explicit_sets: return sorted({item.strip() for item in explicit_sets if item and item.strip()}) metadata_xml = client.fetch_metadata() entity_sets = extract_entity_sets(metadata_xml) names = [str(item.get("name", "")).strip() for item in entity_sets if str(item.get("name", "")).strip()] matcher = [item.strip().lower() for item in (keywords or DEFAULT_KEYWORDS) if item.strip()] matched = [name for name in names if any(token in name.lower() for token in matcher)] return sorted(set(matched)) def iter_paged_records( *, client: ODataClient, entity_set: str, page_size: int, max_pages: int, ) -> tuple[list[dict[str, Any]], dict[str, Any]]: records: list[dict[str, Any]] = [] pages = 0 truncated = False for page_index in range(max_pages): skip = page_index * page_size batch = client.read_entity_set_records(entity_set, top=page_size, extra_params={"$skip": skip}) pages += 1 if not batch: break records.extend(batch) if len(batch) < page_size: break else: truncated = True return records, {"pages_read": pages, "records_read": len(records), "truncated": truncated} def main() -> int: args = parse_args() settings = load_settings() client = ODataClient(settings) scan_start = datetime(args.year, 1, 1, 0, 0, 0, tzinfo=timezone.utc) scan_end = parse_deadline(args.year, args.reporting_deadline) if scan_end < scan_start: raise ValueError("reporting deadline cannot be earlier than scan start") entity_sets = resolve_entity_sets( client=client, explicit_sets=args.entity_set, keywords=args.keyword, ) if not entity_sets: raise RuntimeError("No entity sets resolved for period scan") window_counts: dict[str, int] = defaultdict(int) window_set_counts: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int)) set_scan_stats: dict[str, dict[str, Any]] = {} for entity_set in entity_sets: records, scan_stats = iter_paged_records( client=client, entity_set=entity_set, page_size=args.page_size, max_pages=args.max_pages_per_set, ) set_scan_stats[entity_set] = scan_stats for row in records: dt = parse_record_datetime(row) if dt is None: continue candidate = normalize_dt(dt) if candidate < scan_start or candidate > scan_end: continue key = window_key(candidate, granularity=args.granularity) window_counts[key] += 1 window_set_counts[key][entity_set] += 1 sorted_windows = sorted(window_counts.items(), key=lambda item: (-item[1], item[0])) selected_window_key = sorted_windows[0][0] if sorted_windows else None profile_output = Path(args.profile_output) if args.profile_output else LOGS_DIR / f"pre_report_activity_{args.year}.json" profile_output.parent.mkdir(parents=True, exist_ok=True) profile_payload: dict[str, Any] = { "status": "success" if selected_window_key else "no_data", "year": args.year, "scan_start": scan_start.isoformat(), "scan_end": scan_end.isoformat(), "reporting_deadline": scan_end.isoformat(), "granularity": args.granularity, "page_size": args.page_size, "max_pages_per_set": args.max_pages_per_set, "entity_sets_total": len(entity_sets), "entity_sets": entity_sets, "set_scan_stats": set_scan_stats, "windows": [ { "window_key": key, "records_total": total, "entity_set_counts": dict(sorted(window_set_counts[key].items(), key=lambda item: item[0])), } for key, total in sorted_windows ], "selected_window_key": selected_window_key, } if not selected_window_key: profile_output.write_text(json.dumps(profile_payload, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(profile_payload, ensure_ascii=False, indent=2)) if args.strict: return 1 return 0 selected_start, selected_end = window_bounds_from_key(selected_window_key, granularity=args.granularity) profile_payload["selected_window_start"] = selected_start.isoformat() profile_payload["selected_window_end_exclusive"] = selected_end.isoformat() snapshot_entities: list[dict[str, Any]] = [] snapshot_set_counts: dict[str, int] = defaultdict(int) snapshot_links_total = 0 snapshot_truncated_sets: list[str] = [] for entity_set in entity_sets: records, _ = iter_paged_records( client=client, entity_set=entity_set, page_size=args.page_size, max_pages=args.max_pages_per_set, ) for row in records: dt = parse_record_datetime(row) if dt is None: continue candidate = normalize_dt(dt) if not (selected_start <= candidate < selected_end): continue if snapshot_set_counts[entity_set] >= args.snapshot_max_records_per_set: if entity_set not in snapshot_truncated_sets: snapshot_truncated_sets.append(entity_set) continue entity = map_record(entity_set, row) snapshot_entities.append(entity.model_dump()) snapshot_set_counts[entity_set] += 1 snapshot_links_total += len(entity.links) snapshot_output = ( Path(args.snapshot_output) if args.snapshot_output else LOGS_DIR / f"pre_report_snapshot_{args.year}_{selected_window_key}.json" ) snapshot_output.parent.mkdir(parents=True, exist_ok=True) snapshot_payload = { "status": "success", "year": args.year, "selected_window_key": selected_window_key, "selected_window_start": selected_start.isoformat(), "selected_window_end_exclusive": selected_end.isoformat(), "records_exported_total": len(snapshot_entities), "links_exported_total": snapshot_links_total, "records_per_entity_set": dict(sorted(snapshot_set_counts.items(), key=lambda item: item[0])), "truncated_entity_sets": sorted(snapshot_truncated_sets), "items": snapshot_entities, } profile_payload["snapshot_output"] = str(snapshot_output) profile_payload["snapshot_records_exported_total"] = len(snapshot_entities) profile_payload["snapshot_links_exported_total"] = snapshot_links_total profile_payload["snapshot_records_per_entity_set"] = dict(sorted(snapshot_set_counts.items(), key=lambda item: item[0])) profile_payload["snapshot_truncated_entity_sets"] = sorted(snapshot_truncated_sets) profile_output.write_text(json.dumps(profile_payload, ensure_ascii=False, indent=2), encoding="utf-8") snapshot_output.write_text(json.dumps(snapshot_payload, ensure_ascii=False, indent=2), encoding="utf-8") summary = { "status": "success", "year": args.year, "selected_window_key": selected_window_key, "selected_window_start": selected_start.isoformat(), "selected_window_end_exclusive": selected_end.isoformat(), "activity_records_in_window": int(window_counts[selected_window_key]), "snapshot_records_exported_total": len(snapshot_entities), "snapshot_links_exported_total": snapshot_links_total, "profile_output": str(profile_output), "snapshot_output": str(snapshot_output), } print(json.dumps(summary, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": sys.exit(main())