NODEDC_1C/canonical_layer/period_snapshot.py

96 lines
2.9 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta, timezone
import re
from typing import Any
ODATA_DATE_RE = re.compile(r"/Date\((?P<millis>-?\d+)")
DATE_CANDIDATES = ("Date", "Дата", "Period", "Период", "PostedAt", "posted_at")
def normalize_dt(value: datetime) -> datetime:
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc)
def parse_dt(raw: str | None) -> datetime | None:
if raw is None:
return None
text = raw.strip()
if not text:
return None
match = ODATA_DATE_RE.search(text)
if match:
millis = int(match.group("millis"))
return datetime.fromtimestamp(millis / 1000, tz=timezone.utc)
for candidate in (text.replace("Z", "+00:00"), text):
try:
return normalize_dt(datetime.fromisoformat(candidate))
except ValueError:
continue
return None
def parse_record_datetime(record: dict[str, Any]) -> datetime | None:
for field in DATE_CANDIDATES:
value = record.get(field)
if value is None:
continue
parsed = parse_dt(str(value))
if parsed is not None:
return parsed
return None
def first_day_of_month(value: datetime) -> datetime:
normalized = normalize_dt(value)
return normalized.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
def month_bounds(value: datetime) -> tuple[datetime, datetime]:
start = first_day_of_month(value)
if start.month == 12:
end = start.replace(year=start.year + 1, month=1)
else:
end = start.replace(month=start.month + 1)
return start, end
def iso_week_bounds(value: datetime) -> tuple[datetime, datetime]:
normalized = normalize_dt(value).replace(hour=0, minute=0, second=0, microsecond=0)
start = normalized - timedelta(days=normalized.weekday())
end = start + timedelta(days=7)
return start, end
def window_key(value: datetime, *, granularity: str) -> str:
dt = normalize_dt(value)
if granularity == "month":
return dt.strftime("%Y-%m")
if granularity == "week":
year, week, _ = dt.isocalendar()
return f"{year}-W{week:02d}"
raise ValueError(f"Unsupported granularity: {granularity}")
def window_bounds_from_key(key: str, *, granularity: str) -> tuple[datetime, datetime]:
if granularity == "month":
start = normalize_dt(datetime.strptime(key, "%Y-%m"))
if start.month == 12:
end = start.replace(year=start.year + 1, month=1)
else:
end = start.replace(month=start.month + 1)
return start, end
if granularity == "week":
year_raw, week_raw = key.split("-W")
year = int(year_raw)
week = int(week_raw)
start = normalize_dt(datetime.fromisocalendar(year, week, 1))
end = start + timedelta(days=7)
return start, end
raise ValueError(f"Unsupported granularity: {granularity}")