1842 lines
60 KiB
Python
1842 lines
60 KiB
Python
import json
|
||
import os
|
||
import re
|
||
import uuid
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
import httpx
|
||
import psycopg2
|
||
from fastapi import Body, Depends, FastAPI, HTTPException, Query, Request
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.responses import JSONResponse
|
||
|
||
app = FastAPI()
|
||
|
||
DATABASE_URL = os.environ.get("DATABASE_URL")
|
||
API_KEY = os.environ.get("API_KEY", "").strip()
|
||
INGESTED_WINDOW_MINUTES = 30
|
||
OBSERVED_FRESH_MINUTES = 60
|
||
OBSERVED_RECENT_HOURS = 4
|
||
CORS_ORIGINS = [
|
||
origin.strip()
|
||
for origin in os.environ.get("CORS_ORIGINS", "http://localhost:5173").split(",")
|
||
if origin.strip()
|
||
]
|
||
|
||
LOCAL_TZ = timezone(timedelta(hours=3))
|
||
UI_TZ = "Europe/Moscow"
|
||
|
||
LLM_BASE_URL = os.environ.get("LLM_BASE_URL", "").rstrip("/")
|
||
LLM_MODEL = os.environ.get("LLM_MODEL", "saiga_nemo_12b_gguf")
|
||
LLM_TIMEOUT = float(os.environ.get("LLM_TIMEOUT", "60"))
|
||
LLM_MAX_STATIONS = int(os.environ.get("LLM_MAX_STATIONS", "30"))
|
||
|
||
MAX_TOOL_ITERATIONS = 3
|
||
MAX_RANGE_HOURS_SNAPSHOT = 72
|
||
MAX_RANGE_DAYS_ANOMALIES = 14
|
||
MAX_RANGE_DAYS_HISTORY = 31
|
||
MAX_STATIONS_IN_SNAPSHOT = LLM_MAX_STATIONS
|
||
MAX_ANOMALIES = 100
|
||
MAX_ANOMALIES_ANSWER = 10
|
||
MAX_ANOMALIES_BRIEF = 3
|
||
MAX_ANOMALIES_FULL_LIST = 30
|
||
MAX_HISTORY_POINTS = 500
|
||
MIN_HISTORY_STEP_SECONDS = 300
|
||
|
||
TOOLS_WHITELIST = ["get_snapshot", "get_anomalies", "get_station_history"]
|
||
REQUEST_TYPES = ["smalltalk", "meta", "eco"]
|
||
|
||
ANOMALY_TOPIC_TRIGGERS = [
|
||
"аномал",
|
||
"всплеск",
|
||
"скач",
|
||
"скачок",
|
||
"пик",
|
||
"пики",
|
||
"залип",
|
||
"застрял",
|
||
"подозр",
|
||
"резк",
|
||
"spike",
|
||
"stuck",
|
||
"outlier",
|
||
"flatline",
|
||
"stale",
|
||
"not updating",
|
||
]
|
||
ANOMALY_DETAIL_TRIGGERS = [
|
||
"перечисли",
|
||
"список",
|
||
"покажи все",
|
||
"подроб",
|
||
"подтверж",
|
||
"значени",
|
||
"дельта",
|
||
"по станциям",
|
||
"когда",
|
||
"таймстемп",
|
||
"evidence",
|
||
"timestamps",
|
||
"details",
|
||
"list",
|
||
"show all",
|
||
"where",
|
||
"which",
|
||
]
|
||
ANOMALY_FULL_LIST_TRIGGERS = [
|
||
"полный список",
|
||
"покажи все",
|
||
"перечисли все",
|
||
"все аномалии",
|
||
"все всплески",
|
||
"покажи полный",
|
||
"show all anomalies",
|
||
"full list",
|
||
]
|
||
ANOMALY_TIME_ORDER_TRIGGERS = [
|
||
"по времени",
|
||
"хронолог",
|
||
"по порядку",
|
||
"сначала",
|
||
"по хронологии",
|
||
"chronolog",
|
||
]
|
||
HISTORY_TRIGGERS = [
|
||
"истор",
|
||
"график",
|
||
"динамик",
|
||
"тренд",
|
||
"таймлайн",
|
||
"по часам",
|
||
"по минутам",
|
||
"за период",
|
||
"серия",
|
||
"кривая",
|
||
"trend",
|
||
"history",
|
||
"timeline",
|
||
"time series",
|
||
"series",
|
||
"chart",
|
||
"plot",
|
||
"over time",
|
||
]
|
||
SMALLTALK_TRIGGERS = [
|
||
"привет",
|
||
"здрав",
|
||
"добрый",
|
||
"как дела",
|
||
"спасибо",
|
||
"спс",
|
||
"ок",
|
||
"окей",
|
||
"оке",
|
||
"лол",
|
||
"тест",
|
||
"провер",
|
||
"работает",
|
||
"напиши фразу",
|
||
"напиши слово",
|
||
"угу",
|
||
"ага",
|
||
"ясно",
|
||
"понятно",
|
||
"ладно",
|
||
"алло",
|
||
"ты тут",
|
||
"hi",
|
||
"hello",
|
||
"thanks",
|
||
"ok",
|
||
"test",
|
||
"write a phrase",
|
||
"are you there",
|
||
"ping",
|
||
]
|
||
META_TRIGGERS = [
|
||
"почему ты",
|
||
"как работает",
|
||
"что такое",
|
||
"объясни",
|
||
"tool",
|
||
"snapshot",
|
||
"scope",
|
||
"evidence",
|
||
"prompt",
|
||
"model",
|
||
"llm",
|
||
"system",
|
||
"tools",
|
||
"оркестратор",
|
||
"router",
|
||
"answer",
|
||
"контекст",
|
||
"json",
|
||
"schema",
|
||
"endpoint",
|
||
"api",
|
||
"backend",
|
||
]
|
||
ECO_TRIGGERS = [
|
||
"aqi",
|
||
"air quality",
|
||
"воздух",
|
||
"качество воздуха",
|
||
"станц",
|
||
"измерен",
|
||
"свеж",
|
||
"данные",
|
||
"ingest",
|
||
"observed",
|
||
"fresh",
|
||
"recent",
|
||
"stale",
|
||
"pm2.5",
|
||
"pm10",
|
||
"anomal",
|
||
"station",
|
||
"history",
|
||
"timeline",
|
||
]
|
||
|
||
if CORS_ORIGINS:
|
||
allow_credentials = True
|
||
if CORS_ORIGINS == ["*"]:
|
||
allow_credentials = False
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=CORS_ORIGINS,
|
||
allow_credentials=allow_credentials,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
|
||
def require_db_url():
|
||
if not DATABASE_URL:
|
||
raise RuntimeError("DATABASE_URL is not set")
|
||
|
||
|
||
def require_api_key(request: Request):
|
||
if not API_KEY:
|
||
return
|
||
provided = request.headers.get("x-api-key") or ""
|
||
if provided != API_KEY:
|
||
raise HTTPException(status_code=401, detail="invalid api key")
|
||
|
||
|
||
def require_llm_base_url():
|
||
if not LLM_BASE_URL:
|
||
raise RuntimeError("LLM_BASE_URL is not set")
|
||
|
||
|
||
def text_has_any(text, keywords):
|
||
return any(keyword in text for keyword in keywords)
|
||
|
||
|
||
def classify_message(message):
|
||
text = (message or "").lower()
|
||
ecoish = (
|
||
text_has_any(text, ECO_TRIGGERS)
|
||
or text_has_any(text, ANOMALY_TOPIC_TRIGGERS)
|
||
or text_has_any(text, HISTORY_TRIGGERS)
|
||
)
|
||
if ecoish:
|
||
return "eco"
|
||
if text_has_any(text, META_TRIGGERS):
|
||
return "meta"
|
||
if text_has_any(text, SMALLTALK_TRIGGERS):
|
||
return "smalltalk"
|
||
text = text.strip()
|
||
if not text:
|
||
return "smalltalk"
|
||
return "eco"
|
||
|
||
|
||
def extract_station_id(text):
|
||
if not text:
|
||
return None
|
||
match = re.search(r"(?:station|станц)[^0-9]*(\d+)", text, re.IGNORECASE)
|
||
if not match:
|
||
return None
|
||
try:
|
||
return int(match.group(1))
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def parse_dt(value):
|
||
if not value:
|
||
return None
|
||
value = value.strip()
|
||
if value.endswith("Z"):
|
||
value = value[:-1] + "+00:00"
|
||
try:
|
||
dt = datetime.fromisoformat(value)
|
||
except ValueError as exc:
|
||
raise HTTPException(status_code=400, detail=f"Invalid datetime: {value}") from exc
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
return dt.astimezone(timezone.utc)
|
||
|
||
|
||
ISO_LLM_DATETIME_RE = re.compile(
|
||
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})?$"
|
||
)
|
||
|
||
|
||
def format_llm_timestamp(value):
|
||
if not isinstance(value, str) or not ISO_LLM_DATETIME_RE.match(value):
|
||
return value
|
||
try:
|
||
dt = parse_dt(value)
|
||
except HTTPException:
|
||
return value
|
||
local = dt.astimezone(LOCAL_TZ)
|
||
return local.strftime("%Y-%m-%d %H:%M")
|
||
|
||
|
||
def localize_llm_times(value):
|
||
if isinstance(value, dict):
|
||
return {key: localize_llm_times(val) for key, val in value.items()}
|
||
if isinstance(value, list):
|
||
return [localize_llm_times(item) for item in value]
|
||
return format_llm_timestamp(value)
|
||
|
||
|
||
def parse_date(value):
|
||
if not value:
|
||
return None
|
||
try:
|
||
return datetime.strptime(value, "%Y-%m-%d").date()
|
||
except ValueError as exc:
|
||
raise HTTPException(status_code=400, detail=f"Invalid date: {value}") from exc
|
||
|
||
|
||
def day_bounds_utc(day):
|
||
local_start = datetime(day.year, day.month, day.day, tzinfo=LOCAL_TZ)
|
||
local_end = local_start + timedelta(days=1)
|
||
return local_start.astimezone(timezone.utc), local_end.astimezone(timezone.utc)
|
||
|
||
|
||
def dt_iso(dt):
|
||
if dt is None:
|
||
return None
|
||
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
|
||
|
||
|
||
def local_date_key(dt):
|
||
if dt is None:
|
||
return None
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
return dt.astimezone(LOCAL_TZ).date().isoformat()
|
||
|
||
|
||
def build_station_snapshots(cur):
|
||
cur.execute(
|
||
"""
|
||
WITH last_ingested AS (
|
||
SELECT station_id, MAX(ingested_ts) AS last_ingested_ts
|
||
FROM measurements
|
||
GROUP BY station_id
|
||
),
|
||
last_observed AS (
|
||
SELECT DISTINCT ON (station_id) station_id, observed_ts AS last_observed_ts
|
||
FROM measurements
|
||
ORDER BY station_id, observed_ts DESC
|
||
),
|
||
last_aqi AS (
|
||
SELECT DISTINCT ON (station_id) station_id, observed_ts AS last_aqi_ts, aqi AS last_aqi
|
||
FROM measurements
|
||
WHERE aqi IS NOT NULL
|
||
ORDER BY station_id, observed_ts DESC
|
||
)
|
||
SELECT s.id,
|
||
s.provider_uid,
|
||
s.name,
|
||
s.lat,
|
||
s.lon,
|
||
li.last_ingested_ts,
|
||
lo.last_observed_ts,
|
||
la.last_aqi,
|
||
la.last_aqi_ts
|
||
FROM stations s
|
||
LEFT JOIN last_ingested li ON li.station_id = s.id
|
||
LEFT JOIN last_observed lo ON lo.station_id = s.id
|
||
LEFT JOIN last_aqi la ON la.station_id = s.id
|
||
ORDER BY s.id;
|
||
"""
|
||
)
|
||
return cur.fetchall()
|
||
|
||
|
||
def classify_station(now, last_ingested_ts, last_observed_ts):
|
||
ingested_cutoff = now - timedelta(minutes=INGESTED_WINDOW_MINUTES)
|
||
fresh_cutoff = now - timedelta(minutes=OBSERVED_FRESH_MINUTES)
|
||
recent_cutoff = now - timedelta(hours=OBSERVED_RECENT_HOURS)
|
||
in_flow = bool(last_ingested_ts and last_ingested_ts >= ingested_cutoff)
|
||
fresh = bool(in_flow and last_observed_ts and last_observed_ts >= fresh_cutoff)
|
||
recent = bool(
|
||
in_flow and last_observed_ts and recent_cutoff <= last_observed_ts < fresh_cutoff
|
||
)
|
||
stale = bool(in_flow and (not last_observed_ts or last_observed_ts < recent_cutoff))
|
||
return in_flow, fresh, recent, stale
|
||
|
||
|
||
def sort_stations_for_llm(stations):
|
||
def key(item):
|
||
aqi = item.get("last_aqi")
|
||
aqi_missing = aqi is None
|
||
observed = item.get("last_observed_ts")
|
||
observed_dt = parse_dt(observed) if observed else None
|
||
observed_ts = observed_dt.timestamp() if observed_dt else float("-inf")
|
||
station_id = item.get("station_id") or 0
|
||
return (
|
||
aqi_missing,
|
||
-(float(aqi) if aqi is not None else 0.0),
|
||
-observed_ts,
|
||
station_id,
|
||
)
|
||
|
||
return sorted(stations, key=key)
|
||
|
||
|
||
def select_llm_stations(stations):
|
||
ordered = sort_stations_for_llm(stations)
|
||
limited = ordered[:LLM_MAX_STATIONS]
|
||
total = len(ordered)
|
||
return {
|
||
"stations": limited,
|
||
"stations_total": total,
|
||
"stations_in_snapshot": len(limited),
|
||
"stations_truncated": total > len(limited),
|
||
"selection_rule": "aqi_desc_top_n",
|
||
}
|
||
|
||
|
||
def fetch_stations(now):
|
||
require_db_url()
|
||
items = []
|
||
with psycopg2.connect(DATABASE_URL) as conn:
|
||
with conn.cursor() as cur:
|
||
for (
|
||
station_id,
|
||
provider_uid,
|
||
name,
|
||
lat,
|
||
lon,
|
||
last_ingested_ts,
|
||
last_observed_ts,
|
||
last_aqi,
|
||
last_aqi_ts,
|
||
) in build_station_snapshots(cur):
|
||
in_flow, fresh, recent, stale = classify_station(
|
||
now, last_ingested_ts, last_observed_ts
|
||
)
|
||
items.append(
|
||
{
|
||
"station_id": station_id,
|
||
"provider_uid": provider_uid,
|
||
"name": name,
|
||
"lat": lat,
|
||
"lon": lon,
|
||
"last_aqi": last_aqi,
|
||
"last_aqi_ts": dt_iso(last_aqi_ts),
|
||
"last_observed_ts": dt_iso(last_observed_ts),
|
||
"last_ingested_ts": dt_iso(last_ingested_ts),
|
||
"in_flow": in_flow,
|
||
"fresh": fresh,
|
||
"recent": recent,
|
||
"stale": stale,
|
||
}
|
||
)
|
||
return items
|
||
|
||
|
||
def fetch_summary(now, from_ts, to_ts):
|
||
require_db_url()
|
||
stations_total = 0
|
||
inflow_count = 0
|
||
fresh_count = 0
|
||
recent_count = 0
|
||
stale_count = 0
|
||
last_ingested = None
|
||
last_observed = None
|
||
aqi_values = []
|
||
|
||
with psycopg2.connect(DATABASE_URL) as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT COUNT(*) FROM stations;")
|
||
stations_total = cur.fetchone()[0]
|
||
|
||
for (
|
||
_station_id,
|
||
_provider_uid,
|
||
_name,
|
||
_lat,
|
||
_lon,
|
||
last_ingested_ts,
|
||
last_observed_ts,
|
||
last_aqi,
|
||
_last_aqi_ts,
|
||
) in build_station_snapshots(cur):
|
||
if last_ingested_ts and (last_ingested is None or last_ingested_ts > last_ingested):
|
||
last_ingested = last_ingested_ts
|
||
if last_observed_ts and (last_observed is None or last_observed_ts > last_observed):
|
||
last_observed = last_observed_ts
|
||
|
||
in_flow, fresh, recent, stale = classify_station(
|
||
now, last_ingested_ts, last_observed_ts
|
||
)
|
||
if not in_flow:
|
||
continue
|
||
inflow_count += 1
|
||
if fresh:
|
||
fresh_count += 1
|
||
elif recent:
|
||
recent_count += 1
|
||
else:
|
||
stale_count += 1
|
||
if last_aqi is not None:
|
||
aqi_values.append(last_aqi)
|
||
|
||
avg_now = None
|
||
max_now = None
|
||
if aqi_values:
|
||
avg_now = sum(aqi_values) / len(aqi_values)
|
||
max_now = max(aqi_values)
|
||
|
||
return {
|
||
"range": {"from": dt_iso(from_ts), "to": dt_iso(to_ts)},
|
||
"windows": {
|
||
"ingested_minutes": INGESTED_WINDOW_MINUTES,
|
||
"observed_fresh_minutes": OBSERVED_FRESH_MINUTES,
|
||
"observed_recent_hours": OBSERVED_RECENT_HOURS,
|
||
},
|
||
"stations_total": stations_total,
|
||
"in_flow": inflow_count,
|
||
"fresh": fresh_count,
|
||
"recent": recent_count,
|
||
"stale": stale_count,
|
||
"aqi_now": {
|
||
"avg": avg_now,
|
||
"max": max_now,
|
||
"count": len(aqi_values),
|
||
},
|
||
"last_ingested_ts": dt_iso(last_ingested),
|
||
"last_observed_ts": dt_iso(last_observed),
|
||
"now": dt_iso(now),
|
||
}
|
||
|
||
|
||
def fetch_anomalies_summary(from_ts, to_ts, anomaly_types=None, station_id=None):
|
||
require_db_url()
|
||
params = [from_ts, to_ts]
|
||
where = ["a.ts_start >= %s", "a.ts_start <= %s"]
|
||
if station_id is not None:
|
||
where.append("a.station_id = %s")
|
||
params.append(station_id)
|
||
types = None
|
||
if anomaly_types:
|
||
if isinstance(anomaly_types, str):
|
||
types = [anomaly_types]
|
||
else:
|
||
types = list(anomaly_types)
|
||
if types:
|
||
where.append("a.type = ANY(%s)")
|
||
params.append(types)
|
||
where_sql = " AND ".join(where)
|
||
with psycopg2.connect(DATABASE_URL) as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
f"""
|
||
SELECT COUNT(*)
|
||
FROM anomalies a
|
||
WHERE {where_sql};
|
||
""",
|
||
params,
|
||
)
|
||
count = cur.fetchone()[0]
|
||
return {"has_anomalies": count > 0, "count": count}
|
||
|
||
|
||
def build_snapshot(now, from_ts, to_ts, mode, station_focus=None, max_stations=None):
|
||
summary = fetch_summary(now, from_ts, to_ts)
|
||
stations = fetch_stations(now)
|
||
selection = select_llm_stations(stations)
|
||
if max_stations and max_stations < selection["stations_in_snapshot"]:
|
||
selection["stations"] = selection["stations"][:max_stations]
|
||
selection["stations_in_snapshot"] = len(selection["stations"])
|
||
selection["stations_truncated"] = selection["stations_total"] > selection["stations_in_snapshot"]
|
||
anomalies_summary = fetch_anomalies_summary(
|
||
from_ts,
|
||
to_ts,
|
||
anomaly_types=["spike", "stuck"],
|
||
station_id=station_focus,
|
||
)
|
||
return {
|
||
"type": "snapshot_v1",
|
||
"request_id": str(uuid.uuid4()),
|
||
"generated_at": dt_iso(now),
|
||
"ui": {
|
||
"mode": mode,
|
||
"range": {
|
||
"from": dt_iso(from_ts),
|
||
"to": dt_iso(to_ts),
|
||
"tz": UI_TZ,
|
||
},
|
||
"station_focus": station_focus,
|
||
},
|
||
"limits": {
|
||
"max_tool_iterations": MAX_TOOL_ITERATIONS,
|
||
"max_range_hours_snapshot": MAX_RANGE_HOURS_SNAPSHOT,
|
||
"max_range_days_anomalies": MAX_RANGE_DAYS_ANOMALIES,
|
||
"max_range_days_history": MAX_RANGE_DAYS_HISTORY,
|
||
"max_stations_in_snapshot": MAX_STATIONS_IN_SNAPSHOT,
|
||
"max_anomalies": MAX_ANOMALIES,
|
||
"max_history_points": MAX_HISTORY_POINTS,
|
||
"min_history_step_seconds": MIN_HISTORY_STEP_SECONDS,
|
||
},
|
||
"data": {
|
||
"summary": summary,
|
||
"stations": selection["stations"],
|
||
"anomalies_summary": anomalies_summary,
|
||
"stations_in_snapshot": selection["stations_in_snapshot"],
|
||
"stations_truncated": selection["stations_truncated"],
|
||
"selection_rule": selection["selection_rule"],
|
||
},
|
||
"tools_whitelist": TOOLS_WHITELIST,
|
||
}
|
||
|
||
|
||
def normalize_json(value):
|
||
if value is None or isinstance(value, (dict, list)):
|
||
return value
|
||
if isinstance(value, str):
|
||
try:
|
||
return json.loads(value)
|
||
except json.JSONDecodeError:
|
||
return value
|
||
return value
|
||
|
||
|
||
def fetch_anomalies(from_ts, to_ts, limit, station_id, anomaly_types):
|
||
require_db_url()
|
||
params = [from_ts, to_ts]
|
||
where = ["a.ts_start >= %s", "a.ts_start <= %s"]
|
||
if station_id is not None:
|
||
where.append("a.station_id = %s")
|
||
params.append(station_id)
|
||
types = None
|
||
if anomaly_types:
|
||
if isinstance(anomaly_types, str):
|
||
types = [anomaly_types]
|
||
else:
|
||
types = list(anomaly_types)
|
||
if types:
|
||
where.append("a.type = ANY(%s)")
|
||
params.append(types)
|
||
params.append(limit)
|
||
where_sql = " AND ".join(where)
|
||
with psycopg2.connect(DATABASE_URL) as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
f"""
|
||
SELECT a.id, a.station_id, s.name, a.type, a.severity, a.confidence,
|
||
a.ts_start, a.ts_end, a.metric, a.evidence, a.created_at
|
||
FROM anomalies a
|
||
LEFT JOIN stations s ON s.id = a.station_id
|
||
WHERE {where_sql}
|
||
ORDER BY a.ts_start DESC
|
||
LIMIT %s;
|
||
""",
|
||
params,
|
||
)
|
||
rows = cur.fetchall()
|
||
items = []
|
||
for (
|
||
anomaly_id,
|
||
row_station_id,
|
||
station_name,
|
||
row_type,
|
||
severity,
|
||
confidence,
|
||
ts_start,
|
||
ts_end,
|
||
metric,
|
||
evidence,
|
||
created_at,
|
||
) in rows:
|
||
items.append(
|
||
{
|
||
"id": anomaly_id,
|
||
"station_id": row_station_id,
|
||
"station_name": station_name,
|
||
"type": row_type,
|
||
"severity": severity,
|
||
"confidence": confidence,
|
||
"ts_start": dt_iso(ts_start),
|
||
"ts_end": dt_iso(ts_end),
|
||
"metric": metric,
|
||
"evidence": normalize_json(evidence),
|
||
"created_at": dt_iso(created_at),
|
||
}
|
||
)
|
||
return {
|
||
"range": {"from": dt_iso(from_ts), "to": dt_iso(to_ts)},
|
||
"items": items,
|
||
}
|
||
|
||
|
||
def fetch_history(from_ts, to_ts, since_ts):
|
||
require_db_url()
|
||
params = [from_ts, to_ts]
|
||
conditions = ["m.observed_ts >= %s", "m.observed_ts < %s"]
|
||
if since_ts and since_ts > from_ts:
|
||
conditions.append("m.observed_ts > %s")
|
||
params.append(since_ts)
|
||
where_sql = " AND ".join(conditions)
|
||
with psycopg2.connect(DATABASE_URL) as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
f"""
|
||
SELECT m.station_id,
|
||
s.provider_uid,
|
||
s.name,
|
||
s.lat,
|
||
s.lon,
|
||
m.observed_ts,
|
||
m.ingested_ts,
|
||
m.aqi
|
||
FROM measurements m
|
||
JOIN stations s ON s.id = m.station_id
|
||
WHERE {where_sql}
|
||
ORDER BY m.observed_ts ASC, m.station_id ASC;
|
||
""",
|
||
params,
|
||
)
|
||
rows = cur.fetchall()
|
||
items = []
|
||
for (
|
||
station_id,
|
||
provider_uid,
|
||
name,
|
||
lat,
|
||
lon,
|
||
observed_ts,
|
||
ingested_ts,
|
||
aqi,
|
||
) in rows:
|
||
items.append(
|
||
{
|
||
"station_id": station_id,
|
||
"provider_uid": provider_uid,
|
||
"name": name,
|
||
"lat": lat,
|
||
"lon": lon,
|
||
"observed_ts": dt_iso(observed_ts),
|
||
"ingested_ts": dt_iso(ingested_ts),
|
||
"aqi": aqi,
|
||
}
|
||
)
|
||
return items
|
||
|
||
|
||
def fetch_history_bounds():
|
||
require_db_url()
|
||
with psycopg2.connect(DATABASE_URL) as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT MIN(observed_ts), MAX(observed_ts)
|
||
FROM measurements;
|
||
"""
|
||
)
|
||
row = cur.fetchone()
|
||
if not row:
|
||
return None, None
|
||
return row[0], row[1]
|
||
|
||
|
||
def compute_history_step(from_ts, to_ts, requested_step_seconds):
|
||
range_seconds = max((to_ts - from_ts).total_seconds(), 1)
|
||
step_seconds = requested_step_seconds or MIN_HISTORY_STEP_SECONDS
|
||
step_seconds = max(step_seconds, MIN_HISTORY_STEP_SECONDS)
|
||
min_for_points = int(range_seconds / MAX_HISTORY_POINTS) + 1
|
||
if min_for_points > step_seconds:
|
||
step_seconds = min_for_points
|
||
return step_seconds
|
||
|
||
|
||
def fetch_station_history(station_id, from_ts, to_ts, step_seconds):
|
||
require_db_url()
|
||
with psycopg2.connect(DATABASE_URL) as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT observed_ts, aqi
|
||
FROM measurements
|
||
WHERE station_id = %s
|
||
AND observed_ts >= %s
|
||
AND observed_ts <= %s
|
||
ORDER BY observed_ts ASC;
|
||
""",
|
||
[station_id, from_ts, to_ts],
|
||
)
|
||
rows = cur.fetchall()
|
||
step_seconds = compute_history_step(from_ts, to_ts, step_seconds)
|
||
buckets = {}
|
||
for observed_ts, aqi in rows:
|
||
index = int((observed_ts - from_ts).total_seconds() // step_seconds)
|
||
bucket_ts = from_ts + timedelta(seconds=index * step_seconds)
|
||
bucket = buckets.setdefault(bucket_ts, {"sum": 0.0, "count": 0})
|
||
if aqi is not None:
|
||
bucket["sum"] += float(aqi)
|
||
bucket["count"] += 1
|
||
points = []
|
||
for bucket_ts in sorted(buckets):
|
||
count = buckets[bucket_ts]["count"]
|
||
if count <= 0:
|
||
continue
|
||
points.append(
|
||
{
|
||
"ts": dt_iso(bucket_ts),
|
||
"aqi": buckets[bucket_ts]["sum"] / count,
|
||
}
|
||
)
|
||
return {
|
||
"station_id": station_id,
|
||
"range": {"from": dt_iso(from_ts), "to": dt_iso(to_ts)},
|
||
"step_seconds": step_seconds,
|
||
"points": points,
|
||
}
|
||
|
||
|
||
ROUTER_PROMPT = (
|
||
"Ты — Router AIP Ecology. Верни ТОЛЬКО JSON по схеме router_decision_v1.\n"
|
||
"Всегда указывай request_type: smalltalk | meta | eco.\n"
|
||
"\n"
|
||
"Определение request_type:\n"
|
||
"- smalltalk: привет/как дела/спасибо/ок/анекдот/тест/поболтать.\n"
|
||
"- meta: вопросы «почему ты», «при чем тут», «ты о чем», вопросы о системе/промптах/tools/snapshot.\n"
|
||
"- eco: вопросы про воздух/AQI/станции/аномалии/историю/диапазон/свежесть.\n"
|
||
"\n"
|
||
"Правила:\n"
|
||
"- Если request_type = smalltalk или meta -> type = no_tool.\n"
|
||
"- Если request_type = eco и snapshot_v1 достаточно -> type = no_tool, reason = snapshot_enough.\n"
|
||
"- Если request_type = eco и нужны детали -> type = tool_call_v1.\n"
|
||
"Инструменты:\n"
|
||
"- get_anomalies: списки/подтверждение аномалий.\n"
|
||
"- get_station_history: таймсерия по станции.\n"
|
||
"- get_snapshot: только если нужен другой диапазон/режим или snapshot отсутствует.\n"
|
||
"Без лишнего текста."
|
||
)
|
||
ANSWER_PROMPT = (
|
||
"Ты — аналитик AIP Ecology.\n"
|
||
"REQUEST_TYPE уже определён на backend. Не переклассифицируй.\n"
|
||
"\n"
|
||
"Правила по REQUEST_TYPE:\n"
|
||
"- SMALLTALK/TEST: ответь кратко (1-2 предложения) на языке пользователя. "
|
||
"НЕ упоминай snapshot_v1, tool_result_v1, качество воздуха/AQI, мониторинг, станции, аномалии, диапазоны, временные метки, "
|
||
"\"нет данных\", ingestion/observation или аналитику.\n"
|
||
"- META: кратко объясни на языке пользователя. "
|
||
"Не анализируй качество воздуха и не упоминай конкретные станции, если явно не спросили.\n"
|
||
"- ECO: отвечай ТОЛЬКО по SNAPSHOT_JSON и TOOL_RESULT. Не выдумывай.\n"
|
||
"\n"
|
||
"Правила ECO:\n"
|
||
"- Не добавляй строку вида \"Период: ...\" — период показывает UI.\n"
|
||
"- Упоминай аномалии ТОЛЬКО если пользователь спросил про аномалии/риски ИЛИ TOOL_RESULT.tool == get_anomalies.\n"
|
||
"- Если TOOL_RESULT есть и ok=false: ответь кратко: \"ошибка инструмента: <code> — <message>\".\n"
|
||
"- Если TOOL_RESULT items/points пустые ИЛИ error.code == NO_DATA: ответ строго: \"нет данных\".\n"
|
||
"- Если есть ANOMALIES_BRIEF: используй только его факты и числа, не выводи ISO/JSON.\n"
|
||
" Обычно достаточно 1–2 связных предложения, затем 2–5 коротких строк с примерами.\n"
|
||
"\n"
|
||
"Стиль: деловой и лаконичный. Эмодзи допустимы, но умеренно. Без лишних вступлений и без пустых фраз.\n"
|
||
"Отвечай на языке пользователя."
|
||
)
|
||
|
||
|
||
def router_response_format():
|
||
return {
|
||
"type": "json_schema",
|
||
"json_schema": {
|
||
"name": "router_decision_v1",
|
||
"strict": True,
|
||
"schema": {
|
||
"$schema": "https://json-schema.org/draft/2020-12/schema",
|
||
"title": "router_decision_v1",
|
||
"type": "object",
|
||
"oneOf": [
|
||
{
|
||
"type": "object",
|
||
"required": ["type", "request_type", "id", "tool", "args"],
|
||
"properties": {
|
||
"type": {"const": "tool_call_v1"},
|
||
"request_type": {
|
||
"type": "string",
|
||
"enum": REQUEST_TYPES,
|
||
},
|
||
"id": {"type": "string", "minLength": 1},
|
||
"tool": {
|
||
"type": "string",
|
||
"enum": TOOLS_WHITELIST,
|
||
},
|
||
"args": {"type": "object"},
|
||
},
|
||
"additionalProperties": False,
|
||
},
|
||
{
|
||
"type": "object",
|
||
"required": ["type", "request_type"],
|
||
"properties": {
|
||
"type": {"const": "no_tool"},
|
||
"request_type": {
|
||
"type": "string",
|
||
"enum": REQUEST_TYPES,
|
||
},
|
||
"reason": {"type": "string"},
|
||
},
|
||
"additionalProperties": False,
|
||
},
|
||
],
|
||
},
|
||
},
|
||
}
|
||
|
||
|
||
def call_llm(messages, temperature, max_tokens, response_format=None):
|
||
require_llm_base_url()
|
||
payload = {
|
||
"model": LLM_MODEL,
|
||
"messages": messages,
|
||
"temperature": temperature,
|
||
"max_tokens": max_tokens,
|
||
}
|
||
if response_format:
|
||
payload["response_format"] = response_format
|
||
try:
|
||
with httpx.Client(timeout=LLM_TIMEOUT) as client:
|
||
resp = client.post(f"{LLM_BASE_URL}/v1/chat/completions", json=payload)
|
||
except httpx.RequestError as exc:
|
||
raise HTTPException(status_code=502, detail=f"LLM request failed: {exc}") from exc
|
||
if resp.status_code >= 400:
|
||
raise HTTPException(
|
||
status_code=502,
|
||
detail=f"LLM error {resp.status_code}: {resp.text}",
|
||
)
|
||
try:
|
||
raw_text = resp.content.decode("utf-8")
|
||
except UnicodeDecodeError:
|
||
raw_text = resp.content.decode("utf-8", errors="replace")
|
||
try:
|
||
data = json.loads(raw_text)
|
||
except json.JSONDecodeError as exc:
|
||
raise HTTPException(status_code=502, detail="LLM response invalid JSON") from exc
|
||
try:
|
||
return data["choices"][0]["message"]["content"]
|
||
except (KeyError, IndexError, TypeError) as exc:
|
||
raise HTTPException(status_code=502, detail="LLM response invalid") from exc
|
||
|
||
|
||
def parse_json_content(text):
|
||
if not text:
|
||
return None
|
||
try:
|
||
return json.loads(text)
|
||
except json.JSONDecodeError:
|
||
start = text.find("{")
|
||
end = text.rfind("}")
|
||
if start == -1 or end == -1 or end <= start:
|
||
return None
|
||
try:
|
||
return json.loads(text[start : end + 1])
|
||
except json.JSONDecodeError:
|
||
return None
|
||
|
||
|
||
def normalize_request_type(value, reason, message):
|
||
if isinstance(value, str):
|
||
value = value.strip().lower()
|
||
if value in REQUEST_TYPES:
|
||
return value
|
||
reason_text = (reason or "").lower()
|
||
if "smalltalk" in reason_text:
|
||
return "smalltalk"
|
||
if "meta" in reason_text:
|
||
return "meta"
|
||
fallback = classify_message(message) if message else "eco"
|
||
if fallback in REQUEST_TYPES:
|
||
return fallback
|
||
return "eco"
|
||
|
||
|
||
def normalize_router_decision(decision, message):
|
||
if not isinstance(decision, dict):
|
||
return {
|
||
"type": "no_tool",
|
||
"reason": "router_invalid_json",
|
||
"request_type": normalize_request_type(None, None, message),
|
||
}
|
||
decision_type = decision.get("type")
|
||
reason = decision.get("reason")
|
||
request_type = normalize_request_type(decision.get("request_type"), reason, message)
|
||
if decision_type == "tool_call_v1":
|
||
tool = decision.get("tool")
|
||
args = decision.get("args")
|
||
if tool not in TOOLS_WHITELIST or not isinstance(args, dict):
|
||
return {
|
||
"type": "no_tool",
|
||
"reason": "router_invalid_tool",
|
||
"request_type": request_type,
|
||
}
|
||
tool_id = decision.get("id") or f"call_{uuid.uuid4().hex[:8]}"
|
||
return {
|
||
"type": "tool_call_v1",
|
||
"id": tool_id,
|
||
"tool": tool,
|
||
"args": args,
|
||
"request_type": request_type,
|
||
}
|
||
if decision_type == "no_tool":
|
||
return {
|
||
"type": "no_tool",
|
||
"reason": reason or "snapshot_enough",
|
||
"request_type": request_type,
|
||
}
|
||
return {
|
||
"type": "no_tool",
|
||
"reason": "router_invalid_json",
|
||
"request_type": request_type,
|
||
}
|
||
|
||
|
||
def run_router(snapshot_json, message):
|
||
messages = [
|
||
{"role": "system", "content": ROUTER_PROMPT},
|
||
{"role": "user", "content": f"SNAPSHOT_JSON: {snapshot_json}"},
|
||
{"role": "user", "content": f"QUESTION: {message}"},
|
||
]
|
||
content = call_llm(
|
||
messages,
|
||
temperature=0,
|
||
max_tokens=200,
|
||
response_format=router_response_format(),
|
||
)
|
||
decision = parse_json_content(content)
|
||
if decision is None:
|
||
repair_messages = [
|
||
{
|
||
"role": "system",
|
||
"content": "Return only JSON that matches the schema. Include request_type. No extra text.",
|
||
},
|
||
{"role": "user", "content": content},
|
||
]
|
||
repair_content = call_llm(
|
||
repair_messages,
|
||
temperature=0,
|
||
max_tokens=200,
|
||
response_format=router_response_format(),
|
||
)
|
||
decision = parse_json_content(repair_content)
|
||
return normalize_router_decision(decision, message)
|
||
|
||
|
||
def snapshot_range_args(snapshot):
|
||
ui_range = snapshot["ui"]["range"]
|
||
return {
|
||
"from": ui_range["from"],
|
||
"to": ui_range["to"],
|
||
"tz": ui_range.get("tz") or UI_TZ,
|
||
}
|
||
|
||
|
||
def build_period_payload(snapshot, tool_result=None):
|
||
range_data = None
|
||
if tool_result and tool_result.get("ok"):
|
||
data = tool_result.get("data") or {}
|
||
range_data = data.get("range")
|
||
if not range_data:
|
||
range_data = snapshot.get("ui", {}).get("range") or {}
|
||
from_ts = range_data.get("from")
|
||
to_ts = range_data.get("to")
|
||
if not from_ts or not to_ts:
|
||
return None
|
||
return {
|
||
"from": from_ts,
|
||
"to": to_ts,
|
||
"tz": range_data.get("tz") or UI_TZ,
|
||
}
|
||
|
||
|
||
def wants_anomaly_details(message):
|
||
text = (message or "").lower()
|
||
if not text_has_any(text, ANOMALY_TOPIC_TRIGGERS):
|
||
return False
|
||
return text_has_any(text, ANOMALY_DETAIL_TRIGGERS)
|
||
|
||
|
||
def wants_anomaly_full_list(message):
|
||
text = (message or "").lower()
|
||
if not text_has_any(text, ANOMALY_TOPIC_TRIGGERS):
|
||
return False
|
||
return text_has_any(text, ANOMALY_FULL_LIST_TRIGGERS)
|
||
|
||
|
||
def wants_anomaly_chrono(message):
|
||
text = (message or "").lower()
|
||
return text_has_any(text, ANOMALY_TIME_ORDER_TRIGGERS)
|
||
|
||
|
||
def wants_history(message):
|
||
text = (message or "").lower()
|
||
return text_has_any(text, HISTORY_TRIGGERS)
|
||
|
||
|
||
def apply_fallbacks(decision, message, snapshot, station_focus, message_type):
|
||
if message_type != "eco":
|
||
return decision
|
||
if decision["type"] != "no_tool":
|
||
return decision
|
||
station_id = station_focus or extract_station_id(message)
|
||
if wants_anomaly_details(message):
|
||
return {
|
||
"type": "tool_call_v1",
|
||
"id": f"call_{uuid.uuid4().hex[:8]}",
|
||
"tool": "get_anomalies",
|
||
"args": {
|
||
"range": snapshot_range_args(snapshot),
|
||
"types": ["spike", "stuck"],
|
||
"limit": MAX_ANOMALIES,
|
||
},
|
||
}
|
||
if station_id and wants_history(message):
|
||
return {
|
||
"type": "tool_call_v1",
|
||
"id": f"call_{uuid.uuid4().hex[:8]}",
|
||
"tool": "get_station_history",
|
||
"args": {
|
||
"station_id": station_id,
|
||
"range": snapshot_range_args(snapshot),
|
||
"step_seconds": MIN_HISTORY_STEP_SECONDS,
|
||
},
|
||
}
|
||
return decision
|
||
|
||
|
||
def sanitize_tool_call(decision, station_focus, snapshot):
|
||
if decision.get("type") != "tool_call_v1":
|
||
return decision
|
||
tool = decision.get("tool")
|
||
args = decision.get("args") or {}
|
||
if not isinstance(args, dict):
|
||
args = {}
|
||
if tool == "get_station_history":
|
||
station_id = args.get("station_id")
|
||
if station_id is None and station_focus is not None:
|
||
station_id = station_focus
|
||
if station_id is None:
|
||
return {
|
||
"type": "no_tool",
|
||
"reason": "history_requires_station_id",
|
||
"request_type": decision.get("request_type") or "eco",
|
||
}
|
||
try:
|
||
station_id = int(station_id)
|
||
except (TypeError, ValueError):
|
||
return {
|
||
"type": "no_tool",
|
||
"reason": "history_invalid_station_id",
|
||
"request_type": decision.get("request_type") or "eco",
|
||
}
|
||
args["station_id"] = station_id
|
||
if not args.get("range"):
|
||
args["range"] = snapshot_range_args(snapshot)
|
||
decision["args"] = args
|
||
return decision
|
||
if tool == "get_anomalies":
|
||
if not args.get("range"):
|
||
args["range"] = snapshot_range_args(snapshot)
|
||
if "limit" not in args and "max_anomalies" in args:
|
||
try:
|
||
args["limit"] = int(args.get("max_anomalies"))
|
||
except (TypeError, ValueError):
|
||
pass
|
||
decision["args"] = args
|
||
return decision
|
||
|
||
|
||
def tool_result_is_no_data(tool_result):
|
||
if not tool_result:
|
||
return False
|
||
if tool_result.get("ok") is False:
|
||
err = tool_result.get("error") or {}
|
||
code = str(err.get("code") or "").upper()
|
||
return code == "NO_DATA"
|
||
data = tool_result.get("data") or {}
|
||
tool = tool_result.get("tool")
|
||
if tool == "get_anomalies":
|
||
return False
|
||
if tool == "get_station_history" and not data.get("points"):
|
||
return True
|
||
return False
|
||
|
||
|
||
def format_tool_error(tool_result):
|
||
err = (tool_result or {}).get("error") or {}
|
||
code = err.get("code") or "ERROR"
|
||
message = err.get("message") or ""
|
||
if message:
|
||
return f"ошибка инструмента: {code} — {message}"
|
||
return f"ошибка инструмента: {code}"
|
||
|
||
|
||
def strip_iso_period_lines(text):
|
||
if not text:
|
||
return text
|
||
iso_ts = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})?"
|
||
pattern = re.compile(rf"^\s*(?:Период|Period)\s*:\s*{iso_ts}\s*[-–]\s*{iso_ts}\s*$")
|
||
lines = str(text).replace("\r", "").split("\n")
|
||
kept = [line for line in lines if not pattern.match(line.strip())]
|
||
cleaned = "\n".join(kept).strip()
|
||
return cleaned or text
|
||
|
||
|
||
def clean_station_name(name, station_id=None):
|
||
if not name:
|
||
return f"station {station_id}" if station_id is not None else "station"
|
||
return name.split(" (", 1)[0]
|
||
|
||
|
||
def parse_local_dt(value):
|
||
if not value:
|
||
return None
|
||
try:
|
||
dt = parse_dt(value)
|
||
except HTTPException:
|
||
return None
|
||
return dt.astimezone(LOCAL_TZ)
|
||
|
||
|
||
def format_time_local(dt):
|
||
return dt.strftime("%H:%M") if dt else None
|
||
|
||
|
||
def format_date_local(dt):
|
||
return dt.strftime("%d.%m") if dt else None
|
||
|
||
|
||
def is_utc_day_range(from_ts, to_ts):
|
||
if not from_ts or not to_ts:
|
||
return False
|
||
if from_ts[:10] != to_ts[:10]:
|
||
return False
|
||
start_ok = bool(re.search(r"T00:00(?::00(?:\\.\\d+)?)?(?:Z|[+-]\\d{2}:\\d{2})?$", from_ts))
|
||
end_ok = bool(re.search(r"T23:59(?::59(?:\\.\\d+)?)?(?:Z|[+-]\\d{2}:\\d{2})?$", to_ts))
|
||
return start_ok and end_ok
|
||
|
||
|
||
def range_span_days(range_info):
|
||
from_dt = parse_local_dt(range_info.get("from"))
|
||
to_dt = parse_local_dt(range_info.get("to"))
|
||
if not from_dt or not to_dt:
|
||
return False
|
||
return from_dt.date() != to_dt.date()
|
||
|
||
|
||
def build_period_label(range_info):
|
||
from_ts = range_info.get("from")
|
||
to_ts = range_info.get("to")
|
||
if is_utc_day_range(from_ts, to_ts):
|
||
try:
|
||
day = datetime.strptime(from_ts[:10], "%Y-%m-%d")
|
||
except ValueError:
|
||
return None
|
||
return day.strftime("%d.%m")
|
||
from_dt = parse_local_dt(from_ts)
|
||
to_dt = parse_local_dt(to_ts)
|
||
if from_dt and to_dt and from_dt.date() == to_dt.date():
|
||
return format_date_local(from_dt)
|
||
return None
|
||
|
||
|
||
def dedupe_anomalies(items):
|
||
seen = set()
|
||
cleaned = []
|
||
for item in items:
|
||
evidence = item.get("evidence") or {}
|
||
key = (
|
||
item.get("station_id"),
|
||
item.get("type"),
|
||
item.get("ts_start"),
|
||
item.get("ts_end"),
|
||
evidence.get("delta"),
|
||
)
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
cleaned.append(item)
|
||
return cleaned
|
||
|
||
|
||
def anomaly_key(item):
|
||
evidence = item.get("evidence") or {}
|
||
return (
|
||
item.get("station_id"),
|
||
item.get("type"),
|
||
item.get("ts_start"),
|
||
item.get("ts_end"),
|
||
evidence.get("delta"),
|
||
)
|
||
|
||
|
||
def anomaly_delta(item):
|
||
evidence = item.get("evidence") or {}
|
||
delta = evidence.get("delta")
|
||
if isinstance(delta, (int, float)):
|
||
return delta
|
||
return -1
|
||
|
||
|
||
def anomaly_time_label(item, include_date):
|
||
dt = parse_local_dt(item.get("ts_start"))
|
||
if not dt:
|
||
return None
|
||
time_label = format_time_local(dt)
|
||
if include_date:
|
||
date_label = format_date_local(dt)
|
||
if date_label and time_label:
|
||
return f"{date_label} {time_label}"
|
||
return date_label or time_label
|
||
return time_label
|
||
|
||
|
||
def build_anomalies_brief(tool_result, top_n=MAX_ANOMALIES_BRIEF):
|
||
data = (tool_result or {}).get("data") or {}
|
||
items = data.get("items") or []
|
||
cleaned = dedupe_anomalies(items)
|
||
range_info = data.get("range") or {}
|
||
include_date = range_span_days(range_info)
|
||
period_label = build_period_label(range_info)
|
||
stations = set()
|
||
for item in cleaned:
|
||
station_id = item.get("station_id")
|
||
if station_id is not None:
|
||
stations.add(station_id)
|
||
continue
|
||
station_name = item.get("station_name")
|
||
if station_name:
|
||
stations.add(station_name)
|
||
stations_count = len(stations) if stations else None
|
||
sorted_by_delta = sorted(cleaned, key=anomaly_delta, reverse=True)
|
||
strongest_item = sorted_by_delta[0] if sorted_by_delta else None
|
||
strongest_key = anomaly_key(strongest_item) if strongest_item else None
|
||
|
||
def to_brief(item):
|
||
evidence = item.get("evidence") or {}
|
||
return {
|
||
"station_name": clean_station_name(item.get("station_name"), item.get("station_id")),
|
||
"time_msk": anomaly_time_label(item, include_date),
|
||
"prev": evidence.get("prev_aqi"),
|
||
"curr": evidence.get("curr_aqi"),
|
||
"delta": evidence.get("delta"),
|
||
"kind": item.get("type"),
|
||
}
|
||
|
||
strongest = to_brief(strongest_item) if strongest_item else None
|
||
top_items = []
|
||
for item in sorted_by_delta:
|
||
if strongest_key and anomaly_key(item) == strongest_key:
|
||
continue
|
||
top_items.append(to_brief(item))
|
||
if len(top_items) >= top_n:
|
||
break
|
||
return {
|
||
"count": len(cleaned),
|
||
"stations_count": stations_count,
|
||
"period_label": period_label,
|
||
"strongest": strongest,
|
||
"top_items": top_items,
|
||
}
|
||
|
||
|
||
def format_anomalies_full_list(tool_result, max_items=MAX_ANOMALIES_FULL_LIST, chrono=False):
|
||
data = (tool_result or {}).get("data") or {}
|
||
items = data.get("items") or []
|
||
if not items:
|
||
return (
|
||
"За выбранный период аномалий не найдено. "
|
||
"Можно проверить 24 часа / 3 дня / другую станцию."
|
||
)
|
||
cleaned = dedupe_anomalies(items)
|
||
range_info = data.get("range") or {}
|
||
include_date = range_span_days(range_info)
|
||
if chrono:
|
||
cleaned.sort(key=lambda x: x.get("ts_start") or "")
|
||
else:
|
||
cleaned.sort(key=anomaly_delta, reverse=True)
|
||
limit = min(len(cleaned), max_items)
|
||
lines = []
|
||
for item in cleaned[:limit]:
|
||
evidence = item.get("evidence") or {}
|
||
station = clean_station_name(item.get("station_name"), item.get("station_id"))
|
||
time_label = anomaly_time_label(item, include_date)
|
||
prev = evidence.get("prev_aqi")
|
||
curr = evidence.get("curr_aqi")
|
||
delta = evidence.get("delta")
|
||
parts = [station]
|
||
if time_label:
|
||
parts.append(time_label)
|
||
line = " — ".join(parts)
|
||
if prev is not None and curr is not None and delta is not None:
|
||
line = f"{line} — {prev} → {curr} (+{delta})"
|
||
elif delta is not None:
|
||
line = f"{line} — Δ {delta}"
|
||
lines.append(line)
|
||
if len(cleaned) > limit:
|
||
lines.append(
|
||
f"Показала первые {limit}. Можно сузить период или выбрать станцию."
|
||
)
|
||
return "\n".join(lines)
|
||
|
||
|
||
def validate_range(from_ts, to_ts, max_hours=None, max_days=None):
|
||
if from_ts is None or to_ts is None:
|
||
return "INVALID_RANGE", "Range is required"
|
||
if from_ts > to_ts:
|
||
return "INVALID_RANGE", "'from' must be <= 'to'"
|
||
range_seconds = (to_ts - from_ts).total_seconds()
|
||
if max_hours and range_seconds > max_hours * 3600:
|
||
return "RANGE_TOO_LARGE", f"Range exceeds {max_hours} hours"
|
||
if max_days and range_seconds > max_days * 86400:
|
||
return "RANGE_TOO_LARGE", f"Range exceeds {max_days} days"
|
||
return None, None
|
||
|
||
|
||
def tool_result_ok(tool_id, tool, data):
|
||
return {
|
||
"type": "tool_result_v1",
|
||
"id": tool_id,
|
||
"tool": tool,
|
||
"ok": True,
|
||
"data": data,
|
||
}
|
||
|
||
|
||
def tool_result_error(tool_id, tool, code, message):
|
||
return {
|
||
"type": "tool_result_v1",
|
||
"id": tool_id,
|
||
"tool": tool,
|
||
"ok": False,
|
||
"error": {"code": code, "message": message},
|
||
}
|
||
|
||
|
||
def handle_get_snapshot(args):
|
||
range_args = (args or {}).get("range")
|
||
if not range_args:
|
||
return tool_result_error("missing", "get_snapshot", "INVALID_RANGE", "Range is required")
|
||
from_ts = parse_dt(range_args.get("from"))
|
||
to_ts = parse_dt(range_args.get("to"))
|
||
code, message = validate_range(from_ts, to_ts, max_hours=MAX_RANGE_HOURS_SNAPSHOT)
|
||
if code:
|
||
return tool_result_error("missing", "get_snapshot", code, message)
|
||
limits = (args or {}).get("limits") or {}
|
||
stations_limit = limits.get("stations") or MAX_STATIONS_IN_SNAPSHOT
|
||
stations_limit = min(int(stations_limit), MAX_STATIONS_IN_SNAPSHOT)
|
||
now = datetime.now(timezone.utc)
|
||
summary = fetch_summary(now, from_ts, to_ts)
|
||
selection = select_llm_stations(fetch_stations(now))
|
||
stations = selection["stations"][:stations_limit]
|
||
anomalies_summary = fetch_anomalies_summary(
|
||
from_ts, to_ts, anomaly_types=["spike", "stuck"]
|
||
)
|
||
data = {
|
||
"range": {"from": dt_iso(from_ts), "to": dt_iso(to_ts)},
|
||
"summary": summary,
|
||
"stations": stations,
|
||
"anomalies_summary": anomalies_summary,
|
||
"stations_in_snapshot": len(stations),
|
||
"stations_truncated": selection["stations_total"] > len(stations),
|
||
"selection_rule": selection["selection_rule"],
|
||
}
|
||
return tool_result_ok("missing", "get_snapshot", data)
|
||
|
||
|
||
def handle_get_anomalies(args):
|
||
range_args = (args or {}).get("range")
|
||
if not range_args:
|
||
return tool_result_error("missing", "get_anomalies", "INVALID_RANGE", "Range is required")
|
||
from_ts = parse_dt(range_args.get("from"))
|
||
to_ts = parse_dt(range_args.get("to"))
|
||
code, message = validate_range(from_ts, to_ts, max_days=MAX_RANGE_DAYS_ANOMALIES)
|
||
if code:
|
||
return tool_result_error("missing", "get_anomalies", code, message)
|
||
types = (args or {}).get("types") or ["spike", "stuck"]
|
||
allowed_types = [t for t in types if t in ("spike", "stuck")]
|
||
if not allowed_types:
|
||
allowed_types = ["spike", "stuck"]
|
||
limit = int((args or {}).get("limit") or MAX_ANOMALIES)
|
||
limit = min(limit, MAX_ANOMALIES)
|
||
data = fetch_anomalies(from_ts, to_ts, limit, None, allowed_types)
|
||
return tool_result_ok("missing", "get_anomalies", data)
|
||
|
||
|
||
def handle_get_station_history(args):
|
||
range_args = (args or {}).get("range")
|
||
if not range_args:
|
||
return tool_result_error(
|
||
"missing", "get_station_history", "INVALID_RANGE", "Range is required"
|
||
)
|
||
from_ts = parse_dt(range_args.get("from"))
|
||
to_ts = parse_dt(range_args.get("to"))
|
||
code, message = validate_range(from_ts, to_ts, max_days=MAX_RANGE_DAYS_HISTORY)
|
||
if code:
|
||
return tool_result_error("missing", "get_station_history", code, message)
|
||
station_id = (args or {}).get("station_id")
|
||
if station_id is None:
|
||
return tool_result_error(
|
||
"missing", "get_station_history", "INVALID_ARGS", "station_id is required"
|
||
)
|
||
step_seconds = (args or {}).get("step_seconds") or MIN_HISTORY_STEP_SECONDS
|
||
data = fetch_station_history(int(station_id), from_ts, to_ts, int(step_seconds))
|
||
return tool_result_ok("missing", "get_station_history", data)
|
||
|
||
|
||
def execute_tool_call(decision):
|
||
tool_id = decision.get("id") or f"call_{uuid.uuid4().hex[:8]}"
|
||
tool = decision.get("tool")
|
||
args = decision.get("args") or {}
|
||
if tool == "get_snapshot":
|
||
result = handle_get_snapshot(args)
|
||
elif tool == "get_anomalies":
|
||
result = handle_get_anomalies(args)
|
||
elif tool == "get_station_history":
|
||
result = handle_get_station_history(args)
|
||
else:
|
||
result = tool_result_error(tool_id, tool, "INVALID_TOOL", "Tool not supported")
|
||
result["id"] = tool_id
|
||
result["tool"] = tool
|
||
return result
|
||
|
||
|
||
def build_answer(
|
||
message,
|
||
message_type,
|
||
snapshot_json=None,
|
||
tool_result=None,
|
||
anomalies_brief=None,
|
||
anomalies_mode=None,
|
||
):
|
||
messages = [{"role": "system", "content": ANSWER_PROMPT}]
|
||
req_type = (message_type or "unknown").upper()
|
||
header = f"REQUEST_TYPE: {req_type}\nQUESTION: {message}"
|
||
if anomalies_mode:
|
||
header = f"{header}\nANOMALIES_MODE: {anomalies_mode}"
|
||
messages.append(
|
||
{
|
||
"role": "user",
|
||
"content": header,
|
||
}
|
||
)
|
||
if snapshot_json:
|
||
messages.append(
|
||
{
|
||
"role": "user",
|
||
"content": "SNAPSHOT_JSON:\n```json\n" + snapshot_json + "\n```",
|
||
}
|
||
)
|
||
if tool_result is not None:
|
||
tool_json = json.dumps(tool_result, ensure_ascii=False)
|
||
messages.append(
|
||
{
|
||
"role": "user",
|
||
"content": "TOOL_RESULT:\n```json\n" + tool_json + "\n```",
|
||
}
|
||
)
|
||
if anomalies_brief is not None:
|
||
brief_json = json.dumps(anomalies_brief, ensure_ascii=False)
|
||
messages.append(
|
||
{
|
||
"role": "user",
|
||
"content": "ANOMALIES_BRIEF:\n```json\n" + brief_json + "\n```",
|
||
}
|
||
)
|
||
return call_llm(messages, temperature=0.0, max_tokens=512)
|
||
|
||
|
||
def build_trace_payload(
|
||
message,
|
||
request_type,
|
||
decision,
|
||
snapshot,
|
||
snapshot_llm,
|
||
tool_result,
|
||
tool_result_llm,
|
||
anomalies_brief=None,
|
||
anomalies_mode=None,
|
||
period=None,
|
||
):
|
||
if request_type != "eco":
|
||
return None
|
||
llm_input = {}
|
||
if anomalies_brief is not None:
|
||
llm_input["anomalies_brief"] = anomalies_brief
|
||
if anomalies_mode:
|
||
llm_input["anomalies_mode"] = anomalies_mode
|
||
else:
|
||
if snapshot_llm is not None:
|
||
llm_input["snapshot"] = snapshot_llm
|
||
if tool_result_llm is not None:
|
||
llm_input["tool_result"] = tool_result_llm
|
||
raw_json = tool_result if tool_result is not None else snapshot
|
||
return {
|
||
"trace_id": str(uuid.uuid4()),
|
||
"request_type": request_type,
|
||
"question": message,
|
||
"router": decision,
|
||
"llm_input": llm_input if llm_input else None,
|
||
"raw_json": raw_json,
|
||
"period": period,
|
||
}
|
||
|
||
|
||
@app.get("/health")
|
||
def health():
|
||
return {"ok": True}
|
||
|
||
|
||
@app.get("/aip/waqi/summary")
|
||
def waqi_summary(
|
||
_auth: None = Depends(require_api_key),
|
||
from_ts: str | None = Query(default=None, alias="from"),
|
||
to_ts: str | None = Query(default=None, alias="to"),
|
||
):
|
||
try:
|
||
to_dt = parse_dt(to_ts) or datetime.now(timezone.utc)
|
||
from_dt = parse_dt(from_ts) or (to_dt - timedelta(hours=24))
|
||
if from_dt > to_dt:
|
||
raise HTTPException(status_code=400, detail="'from' must be <= 'to'")
|
||
return fetch_summary(to_dt, from_dt, to_dt)
|
||
except RuntimeError as exc:
|
||
raise HTTPException(status_code=500, detail=str(exc)) from exc
|
||
|
||
|
||
@app.get("/aip/waqi/anomalies")
|
||
def waqi_anomalies(
|
||
_auth: None = Depends(require_api_key),
|
||
from_ts: str | None = Query(default=None, alias="from"),
|
||
to_ts: str | None = Query(default=None, alias="to"),
|
||
limit: int = Query(default=200, ge=1, le=1000),
|
||
station_id: int | None = Query(default=None),
|
||
anomaly_type: str | None = Query(default=None, alias="type"),
|
||
):
|
||
try:
|
||
to_dt = parse_dt(to_ts) or datetime.now(timezone.utc)
|
||
from_dt = parse_dt(from_ts) or (to_dt - timedelta(hours=24))
|
||
if from_dt > to_dt:
|
||
raise HTTPException(status_code=400, detail="'from' must be <= 'to'")
|
||
return fetch_anomalies(from_dt, to_dt, limit, station_id, anomaly_type)
|
||
except RuntimeError as exc:
|
||
raise HTTPException(status_code=500, detail=str(exc)) from exc
|
||
|
||
|
||
@app.get("/aip/waqi/stations")
|
||
def waqi_stations(
|
||
_auth: None = Depends(require_api_key),
|
||
):
|
||
try:
|
||
now = datetime.now(timezone.utc)
|
||
return {
|
||
"now": dt_iso(now),
|
||
"windows": {
|
||
"ingested_minutes": INGESTED_WINDOW_MINUTES,
|
||
"observed_fresh_minutes": OBSERVED_FRESH_MINUTES,
|
||
"observed_recent_hours": OBSERVED_RECENT_HOURS,
|
||
},
|
||
"items": fetch_stations(now),
|
||
}
|
||
except RuntimeError as exc:
|
||
raise HTTPException(status_code=500, detail=str(exc)) from exc
|
||
|
||
|
||
@app.get("/aip/waqi/history")
|
||
def waqi_history(
|
||
_auth: None = Depends(require_api_key),
|
||
date: str = Query(...),
|
||
since: str | None = Query(default=None),
|
||
):
|
||
try:
|
||
day = parse_date(date)
|
||
if day is None:
|
||
raise HTTPException(status_code=400, detail="'date' is required")
|
||
from_dt, to_dt = day_bounds_utc(day)
|
||
since_dt = parse_dt(since) if since else None
|
||
items = fetch_history(from_dt, to_dt, since_dt)
|
||
return {
|
||
"date": date,
|
||
"range": {"from": dt_iso(from_dt), "to": dt_iso(to_dt)},
|
||
"items": items,
|
||
}
|
||
except RuntimeError as exc:
|
||
raise HTTPException(status_code=500, detail=str(exc)) from exc
|
||
|
||
|
||
@app.get("/aip/waqi/history_bounds")
|
||
def waqi_history_bounds(
|
||
_auth: None = Depends(require_api_key),
|
||
):
|
||
try:
|
||
min_ts, max_ts = fetch_history_bounds()
|
||
return {
|
||
"tz": UI_TZ,
|
||
"min_date": local_date_key(min_ts),
|
||
"max_date": local_date_key(max_ts),
|
||
}
|
||
except RuntimeError as exc:
|
||
raise HTTPException(status_code=500, detail=str(exc)) from exc
|
||
|
||
|
||
@app.get("/aip/waqi/report")
|
||
def waqi_report():
|
||
return {}
|
||
|
||
|
||
@app.post("/llm/chat")
|
||
def llm_chat(payload: dict = Body(...)):
|
||
message = str(payload.get("message") or "").strip()
|
||
if not message:
|
||
raise HTTPException(status_code=400, detail="'message' is required")
|
||
mode = payload.get("mode") or "live"
|
||
range_payload = payload.get("range") or {}
|
||
from_raw = payload.get("from") or range_payload.get("from")
|
||
to_raw = payload.get("to") or range_payload.get("to")
|
||
try:
|
||
message_type = classify_message(message)
|
||
to_dt = parse_dt(to_raw) or datetime.now(timezone.utc)
|
||
from_dt = parse_dt(from_raw) or (to_dt - timedelta(hours=24))
|
||
if from_dt > to_dt:
|
||
raise HTTPException(status_code=400, detail="'from' must be <= 'to'")
|
||
station_focus = payload.get("station_focus") or payload.get("station_id")
|
||
if station_focus is not None:
|
||
try:
|
||
station_focus = int(station_focus)
|
||
except (TypeError, ValueError):
|
||
station_focus = None
|
||
station_focus = station_focus or extract_station_id(message)
|
||
now = datetime.now(timezone.utc)
|
||
snapshot = build_snapshot(
|
||
now,
|
||
from_dt,
|
||
to_dt,
|
||
mode,
|
||
station_focus,
|
||
MAX_STATIONS_IN_SNAPSHOT,
|
||
)
|
||
snapshot_llm = localize_llm_times(snapshot)
|
||
snapshot_json = json.dumps(snapshot, ensure_ascii=False)
|
||
snapshot_llm_json = json.dumps(snapshot_llm, ensure_ascii=False)
|
||
decision = run_router(snapshot_json, message)
|
||
anomalies_brief = None
|
||
anomalies_mode = None
|
||
effective_type = (decision.get("request_type") or message_type or "eco").lower()
|
||
if effective_type in ("smalltalk", "meta"):
|
||
decision = {
|
||
"type": "no_tool",
|
||
"reason": "no_tool_for_smalltalk_or_meta",
|
||
"request_type": effective_type,
|
||
}
|
||
answer = build_answer(message, effective_type)
|
||
payload = {
|
||
"request_id": snapshot["request_id"],
|
||
"router": decision,
|
||
"tool_result": None,
|
||
"answer": answer,
|
||
"period": None,
|
||
}
|
||
return JSONResponse(payload, media_type="application/json; charset=utf-8")
|
||
message_type = "eco"
|
||
decision = apply_fallbacks(decision, message, snapshot, station_focus, message_type)
|
||
decision["request_type"] = "eco"
|
||
if decision["type"] == "tool_call_v1" and decision.get("tool") == "get_anomalies":
|
||
args = decision.get("args") or {}
|
||
if not isinstance(args, dict):
|
||
args = {}
|
||
if not args.get("range"):
|
||
args["range"] = snapshot_range_args(snapshot)
|
||
if "limit" not in args and "max_anomalies" in args:
|
||
try:
|
||
args["limit"] = int(args.get("max_anomalies"))
|
||
except (TypeError, ValueError):
|
||
pass
|
||
decision["args"] = args
|
||
decision = sanitize_tool_call(decision, station_focus, snapshot)
|
||
tool_result = None
|
||
if decision["type"] == "tool_call_v1":
|
||
tool_result = execute_tool_call(decision)
|
||
tool_result_llm = localize_llm_times(tool_result) if tool_result else None
|
||
if tool_result and tool_result.get("ok") is False:
|
||
answer = format_tool_error(tool_result)
|
||
elif tool_result and tool_result.get("ok") and tool_result.get("tool") == "get_anomalies":
|
||
items = (tool_result.get("data") or {}).get("items") or []
|
||
if not items:
|
||
answer = (
|
||
"За выбранный период аномалий не найдено. "
|
||
"Можно проверить 24 часа / 3 дня / другую станцию."
|
||
)
|
||
else:
|
||
brief = build_anomalies_brief(tool_result, top_n=MAX_ANOMALIES_BRIEF)
|
||
anomalies_brief = brief
|
||
if wants_anomaly_full_list(message):
|
||
anomalies_mode = "expand"
|
||
full_list = format_anomalies_full_list(
|
||
tool_result,
|
||
max_items=MAX_ANOMALIES_FULL_LIST,
|
||
chrono=wants_anomaly_chrono(message),
|
||
)
|
||
intro = build_answer(
|
||
message,
|
||
message_type,
|
||
anomalies_brief=brief,
|
||
anomalies_mode="expand",
|
||
)
|
||
answer = f"{intro}\n{full_list}".strip()
|
||
else:
|
||
anomalies_mode = "brief"
|
||
answer = build_answer(
|
||
message,
|
||
message_type,
|
||
anomalies_brief=brief,
|
||
anomalies_mode="brief",
|
||
)
|
||
elif tool_result_is_no_data(tool_result):
|
||
answer = "нет данных"
|
||
else:
|
||
answer = build_answer(message, message_type, snapshot_llm_json, tool_result_llm)
|
||
answer = strip_iso_period_lines(answer)
|
||
period = build_period_payload(snapshot, tool_result)
|
||
trace_payload = build_trace_payload(
|
||
message,
|
||
"eco",
|
||
decision,
|
||
snapshot,
|
||
snapshot_llm,
|
||
tool_result,
|
||
tool_result_llm,
|
||
anomalies_brief=anomalies_brief,
|
||
anomalies_mode=anomalies_mode,
|
||
period=period,
|
||
)
|
||
payload = {
|
||
"request_id": snapshot["request_id"],
|
||
"router": decision,
|
||
"tool_result": tool_result,
|
||
"answer": answer,
|
||
"period": period,
|
||
"trace_payload": trace_payload,
|
||
}
|
||
return JSONResponse(payload, media_type="application/json; charset=utf-8")
|
||
except RuntimeError as exc:
|
||
raise HTTPException(status_code=500, detail=str(exc)) from exc
|