96 lines
2.9 KiB
Python
96 lines
2.9 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
import re
|
|
from typing import Any
|
|
|
|
|
|
ODATA_DATE_RE = re.compile(r"/Date\((?P<millis>-?\d+)")
|
|
DATE_CANDIDATES = ("Date", "Дата", "Period", "Период", "PostedAt", "posted_at")
|
|
|
|
|
|
def normalize_dt(value: datetime) -> datetime:
|
|
if value.tzinfo is None:
|
|
return value.replace(tzinfo=timezone.utc)
|
|
return value.astimezone(timezone.utc)
|
|
|
|
|
|
def parse_dt(raw: str | None) -> datetime | None:
|
|
if raw is None:
|
|
return None
|
|
text = raw.strip()
|
|
if not text:
|
|
return None
|
|
|
|
match = ODATA_DATE_RE.search(text)
|
|
if match:
|
|
millis = int(match.group("millis"))
|
|
return datetime.fromtimestamp(millis / 1000, tz=timezone.utc)
|
|
|
|
for candidate in (text.replace("Z", "+00:00"), text):
|
|
try:
|
|
return normalize_dt(datetime.fromisoformat(candidate))
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def parse_record_datetime(record: dict[str, Any]) -> datetime | None:
|
|
for field in DATE_CANDIDATES:
|
|
value = record.get(field)
|
|
if value is None:
|
|
continue
|
|
parsed = parse_dt(str(value))
|
|
if parsed is not None:
|
|
return parsed
|
|
return None
|
|
|
|
|
|
def first_day_of_month(value: datetime) -> datetime:
|
|
normalized = normalize_dt(value)
|
|
return normalized.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
|
|
def month_bounds(value: datetime) -> tuple[datetime, datetime]:
|
|
start = first_day_of_month(value)
|
|
if start.month == 12:
|
|
end = start.replace(year=start.year + 1, month=1)
|
|
else:
|
|
end = start.replace(month=start.month + 1)
|
|
return start, end
|
|
|
|
|
|
def iso_week_bounds(value: datetime) -> tuple[datetime, datetime]:
|
|
normalized = normalize_dt(value).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
start = normalized - timedelta(days=normalized.weekday())
|
|
end = start + timedelta(days=7)
|
|
return start, end
|
|
|
|
|
|
def window_key(value: datetime, *, granularity: str) -> str:
|
|
dt = normalize_dt(value)
|
|
if granularity == "month":
|
|
return dt.strftime("%Y-%m")
|
|
if granularity == "week":
|
|
year, week, _ = dt.isocalendar()
|
|
return f"{year}-W{week:02d}"
|
|
raise ValueError(f"Unsupported granularity: {granularity}")
|
|
|
|
|
|
def window_bounds_from_key(key: str, *, granularity: str) -> tuple[datetime, datetime]:
|
|
if granularity == "month":
|
|
start = normalize_dt(datetime.strptime(key, "%Y-%m"))
|
|
if start.month == 12:
|
|
end = start.replace(year=start.year + 1, month=1)
|
|
else:
|
|
end = start.replace(month=start.month + 1)
|
|
return start, end
|
|
if granularity == "week":
|
|
year_raw, week_raw = key.split("-W")
|
|
year = int(year_raw)
|
|
week = int(week_raw)
|
|
start = normalize_dt(datetime.fromisocalendar(year, week, 1))
|
|
end = start + timedelta(days=7)
|
|
return start, end
|
|
raise ValueError(f"Unsupported granularity: {granularity}")
|