import json import os import re import uuid from datetime import datetime, timedelta, timezone import httpx import psycopg2 from fastapi import Body, Depends, FastAPI, HTTPException, Query, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse app = FastAPI() DATABASE_URL = os.environ.get("DATABASE_URL") API_KEY = os.environ.get("API_KEY", "").strip() INGESTED_WINDOW_MINUTES = 30 OBSERVED_FRESH_MINUTES = 60 OBSERVED_RECENT_HOURS = 4 CORS_ORIGINS = [ origin.strip() for origin in os.environ.get("CORS_ORIGINS", "http://localhost:5173").split(",") if origin.strip() ] LOCAL_TZ = timezone(timedelta(hours=3)) UI_TZ = "Europe/Moscow" LLM_BASE_URL = os.environ.get("LLM_BASE_URL", "").rstrip("/") LLM_MODEL = os.environ.get("LLM_MODEL", "saiga_nemo_12b_gguf") LLM_TIMEOUT = float(os.environ.get("LLM_TIMEOUT", "60")) LLM_MAX_STATIONS = int(os.environ.get("LLM_MAX_STATIONS", "30")) MAX_TOOL_ITERATIONS = 3 MAX_RANGE_HOURS_SNAPSHOT = 72 MAX_RANGE_DAYS_ANOMALIES = 14 MAX_RANGE_DAYS_HISTORY = 31 MAX_STATIONS_IN_SNAPSHOT = LLM_MAX_STATIONS MAX_ANOMALIES = 100 MAX_ANOMALIES_ANSWER = 10 MAX_ANOMALIES_BRIEF = 3 MAX_ANOMALIES_FULL_LIST = 30 MAX_HISTORY_POINTS = 500 MIN_HISTORY_STEP_SECONDS = 300 TOOLS_WHITELIST = ["get_snapshot", "get_anomalies", "get_station_history"] REQUEST_TYPES = ["smalltalk", "meta", "eco"] ANOMALY_TOPIC_TRIGGERS = [ "аномал", "всплеск", "скач", "скачок", "пик", "пики", "залип", "застрял", "подозр", "резк", "spike", "stuck", "outlier", "flatline", "stale", "not updating", ] ANOMALY_DETAIL_TRIGGERS = [ "перечисли", "список", "покажи все", "подроб", "подтверж", "значени", "дельта", "по станциям", "когда", "таймстемп", "evidence", "timestamps", "details", "list", "show all", "where", "which", ] ANOMALY_FULL_LIST_TRIGGERS = [ "полный список", "покажи все", "перечисли все", "все аномалии", "все всплески", "покажи полный", "show all anomalies", "full list", ] ANOMALY_TIME_ORDER_TRIGGERS = [ "по времени", "хронолог", "по порядку", "сначала", "по хронологии", "chronolog", ] HISTORY_TRIGGERS = [ "истор", "график", "динамик", "тренд", "таймлайн", "по часам", "по минутам", "за период", "серия", "кривая", "trend", "history", "timeline", "time series", "series", "chart", "plot", "over time", ] SMALLTALK_TRIGGERS = [ "привет", "здрав", "добрый", "как дела", "спасибо", "спс", "ок", "окей", "оке", "лол", "тест", "провер", "работает", "напиши фразу", "напиши слово", "угу", "ага", "ясно", "понятно", "ладно", "алло", "ты тут", "hi", "hello", "thanks", "ok", "test", "write a phrase", "are you there", "ping", ] META_TRIGGERS = [ "почему ты", "как работает", "что такое", "объясни", "tool", "snapshot", "scope", "evidence", "prompt", "model", "llm", "system", "tools", "оркестратор", "router", "answer", "контекст", "json", "schema", "endpoint", "api", "backend", ] ECO_TRIGGERS = [ "aqi", "air quality", "воздух", "качество воздуха", "станц", "измерен", "свеж", "данные", "ingest", "observed", "fresh", "recent", "stale", "pm2.5", "pm10", "anomal", "station", "history", "timeline", ] if CORS_ORIGINS: allow_credentials = True if CORS_ORIGINS == ["*"]: allow_credentials = False app.add_middleware( CORSMiddleware, allow_origins=CORS_ORIGINS, allow_credentials=allow_credentials, allow_methods=["*"], allow_headers=["*"], ) def require_db_url(): if not DATABASE_URL: raise RuntimeError("DATABASE_URL is not set") def require_api_key(request: Request): if not API_KEY: return provided = request.headers.get("x-api-key") or "" if provided != API_KEY: raise HTTPException(status_code=401, detail="invalid api key") def require_llm_base_url(): if not LLM_BASE_URL: raise RuntimeError("LLM_BASE_URL is not set") def text_has_any(text, keywords): return any(keyword in text for keyword in keywords) def classify_message(message): text = (message or "").lower() ecoish = ( text_has_any(text, ECO_TRIGGERS) or text_has_any(text, ANOMALY_TOPIC_TRIGGERS) or text_has_any(text, HISTORY_TRIGGERS) ) if ecoish: return "eco" if text_has_any(text, META_TRIGGERS): return "meta" if text_has_any(text, SMALLTALK_TRIGGERS): return "smalltalk" text = text.strip() if not text: return "smalltalk" return "eco" def extract_station_id(text): if not text: return None match = re.search(r"(?:station|станц)[^0-9]*(\d+)", text, re.IGNORECASE) if not match: return None try: return int(match.group(1)) except ValueError: return None def parse_dt(value): if not value: return None value = value.strip() if value.endswith("Z"): value = value[:-1] + "+00:00" try: dt = datetime.fromisoformat(value) except ValueError as exc: raise HTTPException(status_code=400, detail=f"Invalid datetime: {value}") from exc if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) ISO_LLM_DATETIME_RE = re.compile( r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})?$" ) def format_llm_timestamp(value): if not isinstance(value, str) or not ISO_LLM_DATETIME_RE.match(value): return value try: dt = parse_dt(value) except HTTPException: return value local = dt.astimezone(LOCAL_TZ) return local.strftime("%Y-%m-%d %H:%M") def localize_llm_times(value): if isinstance(value, dict): return {key: localize_llm_times(val) for key, val in value.items()} if isinstance(value, list): return [localize_llm_times(item) for item in value] return format_llm_timestamp(value) def parse_date(value): if not value: return None try: return datetime.strptime(value, "%Y-%m-%d").date() except ValueError as exc: raise HTTPException(status_code=400, detail=f"Invalid date: {value}") from exc def day_bounds_utc(day): local_start = datetime(day.year, day.month, day.day, tzinfo=LOCAL_TZ) local_end = local_start + timedelta(days=1) return local_start.astimezone(timezone.utc), local_end.astimezone(timezone.utc) def dt_iso(dt): if dt is None: return None return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") def local_date_key(dt): if dt is None: return None if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(LOCAL_TZ).date().isoformat() def build_station_snapshots(cur): cur.execute( """ WITH last_ingested AS ( SELECT station_id, MAX(ingested_ts) AS last_ingested_ts FROM measurements GROUP BY station_id ), last_observed AS ( SELECT DISTINCT ON (station_id) station_id, observed_ts AS last_observed_ts FROM measurements ORDER BY station_id, observed_ts DESC ), last_aqi AS ( SELECT DISTINCT ON (station_id) station_id, observed_ts AS last_aqi_ts, aqi AS last_aqi FROM measurements WHERE aqi IS NOT NULL ORDER BY station_id, observed_ts DESC ) SELECT s.id, s.provider_uid, s.name, s.lat, s.lon, li.last_ingested_ts, lo.last_observed_ts, la.last_aqi, la.last_aqi_ts FROM stations s LEFT JOIN last_ingested li ON li.station_id = s.id LEFT JOIN last_observed lo ON lo.station_id = s.id LEFT JOIN last_aqi la ON la.station_id = s.id ORDER BY s.id; """ ) return cur.fetchall() def classify_station(now, last_ingested_ts, last_observed_ts): ingested_cutoff = now - timedelta(minutes=INGESTED_WINDOW_MINUTES) fresh_cutoff = now - timedelta(minutes=OBSERVED_FRESH_MINUTES) recent_cutoff = now - timedelta(hours=OBSERVED_RECENT_HOURS) in_flow = bool(last_ingested_ts and last_ingested_ts >= ingested_cutoff) fresh = bool(in_flow and last_observed_ts and last_observed_ts >= fresh_cutoff) recent = bool( in_flow and last_observed_ts and recent_cutoff <= last_observed_ts < fresh_cutoff ) stale = bool(in_flow and (not last_observed_ts or last_observed_ts < recent_cutoff)) return in_flow, fresh, recent, stale def sort_stations_for_llm(stations): def key(item): aqi = item.get("last_aqi") aqi_missing = aqi is None observed = item.get("last_observed_ts") observed_dt = parse_dt(observed) if observed else None observed_ts = observed_dt.timestamp() if observed_dt else float("-inf") station_id = item.get("station_id") or 0 return ( aqi_missing, -(float(aqi) if aqi is not None else 0.0), -observed_ts, station_id, ) return sorted(stations, key=key) def select_llm_stations(stations): ordered = sort_stations_for_llm(stations) limited = ordered[:LLM_MAX_STATIONS] total = len(ordered) return { "stations": limited, "stations_total": total, "stations_in_snapshot": len(limited), "stations_truncated": total > len(limited), "selection_rule": "aqi_desc_top_n", } def fetch_stations(now): require_db_url() items = [] with psycopg2.connect(DATABASE_URL) as conn: with conn.cursor() as cur: for ( station_id, provider_uid, name, lat, lon, last_ingested_ts, last_observed_ts, last_aqi, last_aqi_ts, ) in build_station_snapshots(cur): in_flow, fresh, recent, stale = classify_station( now, last_ingested_ts, last_observed_ts ) items.append( { "station_id": station_id, "provider_uid": provider_uid, "name": name, "lat": lat, "lon": lon, "last_aqi": last_aqi, "last_aqi_ts": dt_iso(last_aqi_ts), "last_observed_ts": dt_iso(last_observed_ts), "last_ingested_ts": dt_iso(last_ingested_ts), "in_flow": in_flow, "fresh": fresh, "recent": recent, "stale": stale, } ) return items def fetch_summary(now, from_ts, to_ts): require_db_url() stations_total = 0 inflow_count = 0 fresh_count = 0 recent_count = 0 stale_count = 0 last_ingested = None last_observed = None aqi_values = [] with psycopg2.connect(DATABASE_URL) as conn: with conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM stations;") stations_total = cur.fetchone()[0] for ( _station_id, _provider_uid, _name, _lat, _lon, last_ingested_ts, last_observed_ts, last_aqi, _last_aqi_ts, ) in build_station_snapshots(cur): if last_ingested_ts and (last_ingested is None or last_ingested_ts > last_ingested): last_ingested = last_ingested_ts if last_observed_ts and (last_observed is None or last_observed_ts > last_observed): last_observed = last_observed_ts in_flow, fresh, recent, stale = classify_station( now, last_ingested_ts, last_observed_ts ) if not in_flow: continue inflow_count += 1 if fresh: fresh_count += 1 elif recent: recent_count += 1 else: stale_count += 1 if last_aqi is not None: aqi_values.append(last_aqi) avg_now = None max_now = None if aqi_values: avg_now = sum(aqi_values) / len(aqi_values) max_now = max(aqi_values) return { "range": {"from": dt_iso(from_ts), "to": dt_iso(to_ts)}, "windows": { "ingested_minutes": INGESTED_WINDOW_MINUTES, "observed_fresh_minutes": OBSERVED_FRESH_MINUTES, "observed_recent_hours": OBSERVED_RECENT_HOURS, }, "stations_total": stations_total, "in_flow": inflow_count, "fresh": fresh_count, "recent": recent_count, "stale": stale_count, "aqi_now": { "avg": avg_now, "max": max_now, "count": len(aqi_values), }, "last_ingested_ts": dt_iso(last_ingested), "last_observed_ts": dt_iso(last_observed), "now": dt_iso(now), } def fetch_anomalies_summary(from_ts, to_ts, anomaly_types=None, station_id=None): require_db_url() params = [from_ts, to_ts] where = ["a.ts_start >= %s", "a.ts_start <= %s"] if station_id is not None: where.append("a.station_id = %s") params.append(station_id) types = None if anomaly_types: if isinstance(anomaly_types, str): types = [anomaly_types] else: types = list(anomaly_types) if types: where.append("a.type = ANY(%s)") params.append(types) where_sql = " AND ".join(where) with psycopg2.connect(DATABASE_URL) as conn: with conn.cursor() as cur: cur.execute( f""" SELECT COUNT(*) FROM anomalies a WHERE {where_sql}; """, params, ) count = cur.fetchone()[0] return {"has_anomalies": count > 0, "count": count} def build_snapshot(now, from_ts, to_ts, mode, station_focus=None, max_stations=None): summary = fetch_summary(now, from_ts, to_ts) stations = fetch_stations(now) selection = select_llm_stations(stations) if max_stations and max_stations < selection["stations_in_snapshot"]: selection["stations"] = selection["stations"][:max_stations] selection["stations_in_snapshot"] = len(selection["stations"]) selection["stations_truncated"] = selection["stations_total"] > selection["stations_in_snapshot"] anomalies_summary = fetch_anomalies_summary( from_ts, to_ts, anomaly_types=["spike", "stuck"], station_id=station_focus, ) return { "type": "snapshot_v1", "request_id": str(uuid.uuid4()), "generated_at": dt_iso(now), "ui": { "mode": mode, "range": { "from": dt_iso(from_ts), "to": dt_iso(to_ts), "tz": UI_TZ, }, "station_focus": station_focus, }, "limits": { "max_tool_iterations": MAX_TOOL_ITERATIONS, "max_range_hours_snapshot": MAX_RANGE_HOURS_SNAPSHOT, "max_range_days_anomalies": MAX_RANGE_DAYS_ANOMALIES, "max_range_days_history": MAX_RANGE_DAYS_HISTORY, "max_stations_in_snapshot": MAX_STATIONS_IN_SNAPSHOT, "max_anomalies": MAX_ANOMALIES, "max_history_points": MAX_HISTORY_POINTS, "min_history_step_seconds": MIN_HISTORY_STEP_SECONDS, }, "data": { "summary": summary, "stations": selection["stations"], "anomalies_summary": anomalies_summary, "stations_in_snapshot": selection["stations_in_snapshot"], "stations_truncated": selection["stations_truncated"], "selection_rule": selection["selection_rule"], }, "tools_whitelist": TOOLS_WHITELIST, } def normalize_json(value): if value is None or isinstance(value, (dict, list)): return value if isinstance(value, str): try: return json.loads(value) except json.JSONDecodeError: return value return value def fetch_anomalies(from_ts, to_ts, limit, station_id, anomaly_types): require_db_url() params = [from_ts, to_ts] where = ["a.ts_start >= %s", "a.ts_start <= %s"] if station_id is not None: where.append("a.station_id = %s") params.append(station_id) types = None if anomaly_types: if isinstance(anomaly_types, str): types = [anomaly_types] else: types = list(anomaly_types) if types: where.append("a.type = ANY(%s)") params.append(types) params.append(limit) where_sql = " AND ".join(where) with psycopg2.connect(DATABASE_URL) as conn: with conn.cursor() as cur: cur.execute( f""" SELECT a.id, a.station_id, s.name, a.type, a.severity, a.confidence, a.ts_start, a.ts_end, a.metric, a.evidence, a.created_at FROM anomalies a LEFT JOIN stations s ON s.id = a.station_id WHERE {where_sql} ORDER BY a.ts_start DESC LIMIT %s; """, params, ) rows = cur.fetchall() items = [] for ( anomaly_id, row_station_id, station_name, row_type, severity, confidence, ts_start, ts_end, metric, evidence, created_at, ) in rows: items.append( { "id": anomaly_id, "station_id": row_station_id, "station_name": station_name, "type": row_type, "severity": severity, "confidence": confidence, "ts_start": dt_iso(ts_start), "ts_end": dt_iso(ts_end), "metric": metric, "evidence": normalize_json(evidence), "created_at": dt_iso(created_at), } ) return { "range": {"from": dt_iso(from_ts), "to": dt_iso(to_ts)}, "items": items, } def fetch_history(from_ts, to_ts, since_ts): require_db_url() params = [from_ts, to_ts] conditions = ["m.observed_ts >= %s", "m.observed_ts < %s"] if since_ts and since_ts > from_ts: conditions.append("m.observed_ts > %s") params.append(since_ts) where_sql = " AND ".join(conditions) with psycopg2.connect(DATABASE_URL) as conn: with conn.cursor() as cur: cur.execute( f""" SELECT m.station_id, s.provider_uid, s.name, s.lat, s.lon, m.observed_ts, m.ingested_ts, m.aqi FROM measurements m JOIN stations s ON s.id = m.station_id WHERE {where_sql} ORDER BY m.observed_ts ASC, m.station_id ASC; """, params, ) rows = cur.fetchall() items = [] for ( station_id, provider_uid, name, lat, lon, observed_ts, ingested_ts, aqi, ) in rows: items.append( { "station_id": station_id, "provider_uid": provider_uid, "name": name, "lat": lat, "lon": lon, "observed_ts": dt_iso(observed_ts), "ingested_ts": dt_iso(ingested_ts), "aqi": aqi, } ) return items def fetch_history_bounds(): require_db_url() with psycopg2.connect(DATABASE_URL) as conn: with conn.cursor() as cur: cur.execute( """ SELECT MIN(observed_ts), MAX(observed_ts) FROM measurements; """ ) row = cur.fetchone() if not row: return None, None return row[0], row[1] def compute_history_step(from_ts, to_ts, requested_step_seconds): range_seconds = max((to_ts - from_ts).total_seconds(), 1) step_seconds = requested_step_seconds or MIN_HISTORY_STEP_SECONDS step_seconds = max(step_seconds, MIN_HISTORY_STEP_SECONDS) min_for_points = int(range_seconds / MAX_HISTORY_POINTS) + 1 if min_for_points > step_seconds: step_seconds = min_for_points return step_seconds def fetch_station_history(station_id, from_ts, to_ts, step_seconds): require_db_url() with psycopg2.connect(DATABASE_URL) as conn: with conn.cursor() as cur: cur.execute( """ SELECT observed_ts, aqi FROM measurements WHERE station_id = %s AND observed_ts >= %s AND observed_ts <= %s ORDER BY observed_ts ASC; """, [station_id, from_ts, to_ts], ) rows = cur.fetchall() step_seconds = compute_history_step(from_ts, to_ts, step_seconds) buckets = {} for observed_ts, aqi in rows: index = int((observed_ts - from_ts).total_seconds() // step_seconds) bucket_ts = from_ts + timedelta(seconds=index * step_seconds) bucket = buckets.setdefault(bucket_ts, {"sum": 0.0, "count": 0}) if aqi is not None: bucket["sum"] += float(aqi) bucket["count"] += 1 points = [] for bucket_ts in sorted(buckets): count = buckets[bucket_ts]["count"] if count <= 0: continue points.append( { "ts": dt_iso(bucket_ts), "aqi": buckets[bucket_ts]["sum"] / count, } ) return { "station_id": station_id, "range": {"from": dt_iso(from_ts), "to": dt_iso(to_ts)}, "step_seconds": step_seconds, "points": points, } ROUTER_PROMPT = ( "Ты — Router AIP Ecology. Верни ТОЛЬКО JSON по схеме router_decision_v1.\n" "Всегда указывай request_type: smalltalk | meta | eco.\n" "\n" "Определение request_type:\n" "- smalltalk: привет/как дела/спасибо/ок/анекдот/тест/поболтать.\n" "- meta: вопросы «почему ты», «при чем тут», «ты о чем», вопросы о системе/промптах/tools/snapshot.\n" "- eco: вопросы про воздух/AQI/станции/аномалии/историю/диапазон/свежесть.\n" "\n" "Правила:\n" "- Если request_type = smalltalk или meta -> type = no_tool.\n" "- Если request_type = eco и snapshot_v1 достаточно -> type = no_tool, reason = snapshot_enough.\n" "- Если request_type = eco и нужны детали -> type = tool_call_v1.\n" "Инструменты:\n" "- get_anomalies: списки/подтверждение аномалий.\n" "- get_station_history: таймсерия по станции.\n" "- get_snapshot: только если нужен другой диапазон/режим или snapshot отсутствует.\n" "Без лишнего текста." ) ANSWER_PROMPT = ( "Ты — аналитик AIP Ecology.\n" "REQUEST_TYPE уже определён на backend. Не переклассифицируй.\n" "\n" "Правила по REQUEST_TYPE:\n" "- SMALLTALK/TEST: ответь кратко (1-2 предложения) на языке пользователя. " "НЕ упоминай snapshot_v1, tool_result_v1, качество воздуха/AQI, мониторинг, станции, аномалии, диапазоны, временные метки, " "\"нет данных\", ingestion/observation или аналитику.\n" "- META: кратко объясни на языке пользователя. " "Не анализируй качество воздуха и не упоминай конкретные станции, если явно не спросили.\n" "- ECO: отвечай ТОЛЬКО по SNAPSHOT_JSON и TOOL_RESULT. Не выдумывай.\n" "\n" "Правила ECO:\n" "- Не добавляй строку вида \"Период: ...\" — период показывает UI.\n" "- Упоминай аномалии ТОЛЬКО если пользователь спросил про аномалии/риски ИЛИ TOOL_RESULT.tool == get_anomalies.\n" "- Если TOOL_RESULT есть и ok=false: ответь кратко: \"ошибка инструмента: \".\n" "- Если TOOL_RESULT items/points пустые ИЛИ error.code == NO_DATA: ответ строго: \"нет данных\".\n" "- Если есть ANOMALIES_BRIEF: используй только его факты и числа, не выводи ISO/JSON.\n" " Обычно достаточно 1–2 связных предложения, затем 2–5 коротких строк с примерами.\n" "\n" "Стиль: деловой и лаконичный. Эмодзи допустимы, но умеренно. Без лишних вступлений и без пустых фраз.\n" "Отвечай на языке пользователя." ) def router_response_format(): return { "type": "json_schema", "json_schema": { "name": "router_decision_v1", "strict": True, "schema": { "$schema": "https://json-schema.org/draft/2020-12/schema", "title": "router_decision_v1", "type": "object", "oneOf": [ { "type": "object", "required": ["type", "request_type", "id", "tool", "args"], "properties": { "type": {"const": "tool_call_v1"}, "request_type": { "type": "string", "enum": REQUEST_TYPES, }, "id": {"type": "string", "minLength": 1}, "tool": { "type": "string", "enum": TOOLS_WHITELIST, }, "args": {"type": "object"}, }, "additionalProperties": False, }, { "type": "object", "required": ["type", "request_type"], "properties": { "type": {"const": "no_tool"}, "request_type": { "type": "string", "enum": REQUEST_TYPES, }, "reason": {"type": "string"}, }, "additionalProperties": False, }, ], }, }, } def call_llm(messages, temperature, max_tokens, response_format=None): require_llm_base_url() payload = { "model": LLM_MODEL, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, } if response_format: payload["response_format"] = response_format try: with httpx.Client(timeout=LLM_TIMEOUT) as client: resp = client.post(f"{LLM_BASE_URL}/v1/chat/completions", json=payload) except httpx.RequestError as exc: raise HTTPException(status_code=502, detail=f"LLM request failed: {exc}") from exc if resp.status_code >= 400: raise HTTPException( status_code=502, detail=f"LLM error {resp.status_code}: {resp.text}", ) try: raw_text = resp.content.decode("utf-8") except UnicodeDecodeError: raw_text = resp.content.decode("utf-8", errors="replace") try: data = json.loads(raw_text) except json.JSONDecodeError as exc: raise HTTPException(status_code=502, detail="LLM response invalid JSON") from exc try: return data["choices"][0]["message"]["content"] except (KeyError, IndexError, TypeError) as exc: raise HTTPException(status_code=502, detail="LLM response invalid") from exc def parse_json_content(text): if not text: return None try: return json.loads(text) except json.JSONDecodeError: start = text.find("{") end = text.rfind("}") if start == -1 or end == -1 or end <= start: return None try: return json.loads(text[start : end + 1]) except json.JSONDecodeError: return None def normalize_request_type(value, reason, message): if isinstance(value, str): value = value.strip().lower() if value in REQUEST_TYPES: return value reason_text = (reason or "").lower() if "smalltalk" in reason_text: return "smalltalk" if "meta" in reason_text: return "meta" fallback = classify_message(message) if message else "eco" if fallback in REQUEST_TYPES: return fallback return "eco" def normalize_router_decision(decision, message): if not isinstance(decision, dict): return { "type": "no_tool", "reason": "router_invalid_json", "request_type": normalize_request_type(None, None, message), } decision_type = decision.get("type") reason = decision.get("reason") request_type = normalize_request_type(decision.get("request_type"), reason, message) if decision_type == "tool_call_v1": tool = decision.get("tool") args = decision.get("args") if tool not in TOOLS_WHITELIST or not isinstance(args, dict): return { "type": "no_tool", "reason": "router_invalid_tool", "request_type": request_type, } tool_id = decision.get("id") or f"call_{uuid.uuid4().hex[:8]}" return { "type": "tool_call_v1", "id": tool_id, "tool": tool, "args": args, "request_type": request_type, } if decision_type == "no_tool": return { "type": "no_tool", "reason": reason or "snapshot_enough", "request_type": request_type, } return { "type": "no_tool", "reason": "router_invalid_json", "request_type": request_type, } def run_router(snapshot_json, message): messages = [ {"role": "system", "content": ROUTER_PROMPT}, {"role": "user", "content": f"SNAPSHOT_JSON: {snapshot_json}"}, {"role": "user", "content": f"QUESTION: {message}"}, ] content = call_llm( messages, temperature=0, max_tokens=200, response_format=router_response_format(), ) decision = parse_json_content(content) if decision is None: repair_messages = [ { "role": "system", "content": "Return only JSON that matches the schema. Include request_type. No extra text.", }, {"role": "user", "content": content}, ] repair_content = call_llm( repair_messages, temperature=0, max_tokens=200, response_format=router_response_format(), ) decision = parse_json_content(repair_content) return normalize_router_decision(decision, message) def snapshot_range_args(snapshot): ui_range = snapshot["ui"]["range"] return { "from": ui_range["from"], "to": ui_range["to"], "tz": ui_range.get("tz") or UI_TZ, } def build_period_payload(snapshot, tool_result=None): range_data = None if tool_result and tool_result.get("ok"): data = tool_result.get("data") or {} range_data = data.get("range") if not range_data: range_data = snapshot.get("ui", {}).get("range") or {} from_ts = range_data.get("from") to_ts = range_data.get("to") if not from_ts or not to_ts: return None return { "from": from_ts, "to": to_ts, "tz": range_data.get("tz") or UI_TZ, } def wants_anomaly_details(message): text = (message or "").lower() if not text_has_any(text, ANOMALY_TOPIC_TRIGGERS): return False return text_has_any(text, ANOMALY_DETAIL_TRIGGERS) def wants_anomaly_full_list(message): text = (message or "").lower() if not text_has_any(text, ANOMALY_TOPIC_TRIGGERS): return False return text_has_any(text, ANOMALY_FULL_LIST_TRIGGERS) def wants_anomaly_chrono(message): text = (message or "").lower() return text_has_any(text, ANOMALY_TIME_ORDER_TRIGGERS) def wants_history(message): text = (message or "").lower() return text_has_any(text, HISTORY_TRIGGERS) def apply_fallbacks(decision, message, snapshot, station_focus, message_type): if message_type != "eco": return decision if decision["type"] != "no_tool": return decision station_id = station_focus or extract_station_id(message) if wants_anomaly_details(message): return { "type": "tool_call_v1", "id": f"call_{uuid.uuid4().hex[:8]}", "tool": "get_anomalies", "args": { "range": snapshot_range_args(snapshot), "types": ["spike", "stuck"], "limit": MAX_ANOMALIES, }, } if station_id and wants_history(message): return { "type": "tool_call_v1", "id": f"call_{uuid.uuid4().hex[:8]}", "tool": "get_station_history", "args": { "station_id": station_id, "range": snapshot_range_args(snapshot), "step_seconds": MIN_HISTORY_STEP_SECONDS, }, } return decision def sanitize_tool_call(decision, station_focus, snapshot): if decision.get("type") != "tool_call_v1": return decision tool = decision.get("tool") args = decision.get("args") or {} if not isinstance(args, dict): args = {} if tool == "get_station_history": station_id = args.get("station_id") if station_id is None and station_focus is not None: station_id = station_focus if station_id is None: return { "type": "no_tool", "reason": "history_requires_station_id", "request_type": decision.get("request_type") or "eco", } try: station_id = int(station_id) except (TypeError, ValueError): return { "type": "no_tool", "reason": "history_invalid_station_id", "request_type": decision.get("request_type") or "eco", } args["station_id"] = station_id if not args.get("range"): args["range"] = snapshot_range_args(snapshot) decision["args"] = args return decision if tool == "get_anomalies": if not args.get("range"): args["range"] = snapshot_range_args(snapshot) if "limit" not in args and "max_anomalies" in args: try: args["limit"] = int(args.get("max_anomalies")) except (TypeError, ValueError): pass decision["args"] = args return decision def tool_result_is_no_data(tool_result): if not tool_result: return False if tool_result.get("ok") is False: err = tool_result.get("error") or {} code = str(err.get("code") or "").upper() return code == "NO_DATA" data = tool_result.get("data") or {} tool = tool_result.get("tool") if tool == "get_anomalies": return False if tool == "get_station_history" and not data.get("points"): return True return False def format_tool_error(tool_result): err = (tool_result or {}).get("error") or {} code = err.get("code") or "ERROR" message = err.get("message") or "" if message: return f"ошибка инструмента: {code} — {message}" return f"ошибка инструмента: {code}" def strip_iso_period_lines(text): if not text: return text iso_ts = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})?" pattern = re.compile(rf"^\s*(?:Период|Period)\s*:\s*{iso_ts}\s*[-–]\s*{iso_ts}\s*$") lines = str(text).replace("\r", "").split("\n") kept = [line for line in lines if not pattern.match(line.strip())] cleaned = "\n".join(kept).strip() return cleaned or text def clean_station_name(name, station_id=None): if not name: return f"station {station_id}" if station_id is not None else "station" return name.split(" (", 1)[0] def parse_local_dt(value): if not value: return None try: dt = parse_dt(value) except HTTPException: return None return dt.astimezone(LOCAL_TZ) def format_time_local(dt): return dt.strftime("%H:%M") if dt else None def format_date_local(dt): return dt.strftime("%d.%m") if dt else None def is_utc_day_range(from_ts, to_ts): if not from_ts or not to_ts: return False if from_ts[:10] != to_ts[:10]: return False start_ok = bool(re.search(r"T00:00(?::00(?:\\.\\d+)?)?(?:Z|[+-]\\d{2}:\\d{2})?$", from_ts)) end_ok = bool(re.search(r"T23:59(?::59(?:\\.\\d+)?)?(?:Z|[+-]\\d{2}:\\d{2})?$", to_ts)) return start_ok and end_ok def range_span_days(range_info): from_dt = parse_local_dt(range_info.get("from")) to_dt = parse_local_dt(range_info.get("to")) if not from_dt or not to_dt: return False return from_dt.date() != to_dt.date() def build_period_label(range_info): from_ts = range_info.get("from") to_ts = range_info.get("to") if is_utc_day_range(from_ts, to_ts): try: day = datetime.strptime(from_ts[:10], "%Y-%m-%d") except ValueError: return None return day.strftime("%d.%m") from_dt = parse_local_dt(from_ts) to_dt = parse_local_dt(to_ts) if from_dt and to_dt and from_dt.date() == to_dt.date(): return format_date_local(from_dt) return None def dedupe_anomalies(items): seen = set() cleaned = [] for item in items: evidence = item.get("evidence") or {} key = ( item.get("station_id"), item.get("type"), item.get("ts_start"), item.get("ts_end"), evidence.get("delta"), ) if key in seen: continue seen.add(key) cleaned.append(item) return cleaned def anomaly_key(item): evidence = item.get("evidence") or {} return ( item.get("station_id"), item.get("type"), item.get("ts_start"), item.get("ts_end"), evidence.get("delta"), ) def anomaly_delta(item): evidence = item.get("evidence") or {} delta = evidence.get("delta") if isinstance(delta, (int, float)): return delta return -1 def anomaly_time_label(item, include_date): dt = parse_local_dt(item.get("ts_start")) if not dt: return None time_label = format_time_local(dt) if include_date: date_label = format_date_local(dt) if date_label and time_label: return f"{date_label} {time_label}" return date_label or time_label return time_label def build_anomalies_brief(tool_result, top_n=MAX_ANOMALIES_BRIEF): data = (tool_result or {}).get("data") or {} items = data.get("items") or [] cleaned = dedupe_anomalies(items) range_info = data.get("range") or {} include_date = range_span_days(range_info) period_label = build_period_label(range_info) stations = set() for item in cleaned: station_id = item.get("station_id") if station_id is not None: stations.add(station_id) continue station_name = item.get("station_name") if station_name: stations.add(station_name) stations_count = len(stations) if stations else None sorted_by_delta = sorted(cleaned, key=anomaly_delta, reverse=True) strongest_item = sorted_by_delta[0] if sorted_by_delta else None strongest_key = anomaly_key(strongest_item) if strongest_item else None def to_brief(item): evidence = item.get("evidence") or {} return { "station_name": clean_station_name(item.get("station_name"), item.get("station_id")), "time_msk": anomaly_time_label(item, include_date), "prev": evidence.get("prev_aqi"), "curr": evidence.get("curr_aqi"), "delta": evidence.get("delta"), "kind": item.get("type"), } strongest = to_brief(strongest_item) if strongest_item else None top_items = [] for item in sorted_by_delta: if strongest_key and anomaly_key(item) == strongest_key: continue top_items.append(to_brief(item)) if len(top_items) >= top_n: break return { "count": len(cleaned), "stations_count": stations_count, "period_label": period_label, "strongest": strongest, "top_items": top_items, } def format_anomalies_full_list(tool_result, max_items=MAX_ANOMALIES_FULL_LIST, chrono=False): data = (tool_result or {}).get("data") or {} items = data.get("items") or [] if not items: return ( "За выбранный период аномалий не найдено. " "Можно проверить 24 часа / 3 дня / другую станцию." ) cleaned = dedupe_anomalies(items) range_info = data.get("range") or {} include_date = range_span_days(range_info) if chrono: cleaned.sort(key=lambda x: x.get("ts_start") or "") else: cleaned.sort(key=anomaly_delta, reverse=True) limit = min(len(cleaned), max_items) lines = [] for item in cleaned[:limit]: evidence = item.get("evidence") or {} station = clean_station_name(item.get("station_name"), item.get("station_id")) time_label = anomaly_time_label(item, include_date) prev = evidence.get("prev_aqi") curr = evidence.get("curr_aqi") delta = evidence.get("delta") parts = [station] if time_label: parts.append(time_label) line = " — ".join(parts) if prev is not None and curr is not None and delta is not None: line = f"{line} — {prev} → {curr} (+{delta})" elif delta is not None: line = f"{line} — Δ {delta}" lines.append(line) if len(cleaned) > limit: lines.append( f"Показала первые {limit}. Можно сузить период или выбрать станцию." ) return "\n".join(lines) def validate_range(from_ts, to_ts, max_hours=None, max_days=None): if from_ts is None or to_ts is None: return "INVALID_RANGE", "Range is required" if from_ts > to_ts: return "INVALID_RANGE", "'from' must be <= 'to'" range_seconds = (to_ts - from_ts).total_seconds() if max_hours and range_seconds > max_hours * 3600: return "RANGE_TOO_LARGE", f"Range exceeds {max_hours} hours" if max_days and range_seconds > max_days * 86400: return "RANGE_TOO_LARGE", f"Range exceeds {max_days} days" return None, None def tool_result_ok(tool_id, tool, data): return { "type": "tool_result_v1", "id": tool_id, "tool": tool, "ok": True, "data": data, } def tool_result_error(tool_id, tool, code, message): return { "type": "tool_result_v1", "id": tool_id, "tool": tool, "ok": False, "error": {"code": code, "message": message}, } def handle_get_snapshot(args): range_args = (args or {}).get("range") if not range_args: return tool_result_error("missing", "get_snapshot", "INVALID_RANGE", "Range is required") from_ts = parse_dt(range_args.get("from")) to_ts = parse_dt(range_args.get("to")) code, message = validate_range(from_ts, to_ts, max_hours=MAX_RANGE_HOURS_SNAPSHOT) if code: return tool_result_error("missing", "get_snapshot", code, message) limits = (args or {}).get("limits") or {} stations_limit = limits.get("stations") or MAX_STATIONS_IN_SNAPSHOT stations_limit = min(int(stations_limit), MAX_STATIONS_IN_SNAPSHOT) now = datetime.now(timezone.utc) summary = fetch_summary(now, from_ts, to_ts) selection = select_llm_stations(fetch_stations(now)) stations = selection["stations"][:stations_limit] anomalies_summary = fetch_anomalies_summary( from_ts, to_ts, anomaly_types=["spike", "stuck"] ) data = { "range": {"from": dt_iso(from_ts), "to": dt_iso(to_ts)}, "summary": summary, "stations": stations, "anomalies_summary": anomalies_summary, "stations_in_snapshot": len(stations), "stations_truncated": selection["stations_total"] > len(stations), "selection_rule": selection["selection_rule"], } return tool_result_ok("missing", "get_snapshot", data) def handle_get_anomalies(args): range_args = (args or {}).get("range") if not range_args: return tool_result_error("missing", "get_anomalies", "INVALID_RANGE", "Range is required") from_ts = parse_dt(range_args.get("from")) to_ts = parse_dt(range_args.get("to")) code, message = validate_range(from_ts, to_ts, max_days=MAX_RANGE_DAYS_ANOMALIES) if code: return tool_result_error("missing", "get_anomalies", code, message) types = (args or {}).get("types") or ["spike", "stuck"] allowed_types = [t for t in types if t in ("spike", "stuck")] if not allowed_types: allowed_types = ["spike", "stuck"] limit = int((args or {}).get("limit") or MAX_ANOMALIES) limit = min(limit, MAX_ANOMALIES) data = fetch_anomalies(from_ts, to_ts, limit, None, allowed_types) return tool_result_ok("missing", "get_anomalies", data) def handle_get_station_history(args): range_args = (args or {}).get("range") if not range_args: return tool_result_error( "missing", "get_station_history", "INVALID_RANGE", "Range is required" ) from_ts = parse_dt(range_args.get("from")) to_ts = parse_dt(range_args.get("to")) code, message = validate_range(from_ts, to_ts, max_days=MAX_RANGE_DAYS_HISTORY) if code: return tool_result_error("missing", "get_station_history", code, message) station_id = (args or {}).get("station_id") if station_id is None: return tool_result_error( "missing", "get_station_history", "INVALID_ARGS", "station_id is required" ) step_seconds = (args or {}).get("step_seconds") or MIN_HISTORY_STEP_SECONDS data = fetch_station_history(int(station_id), from_ts, to_ts, int(step_seconds)) return tool_result_ok("missing", "get_station_history", data) def execute_tool_call(decision): tool_id = decision.get("id") or f"call_{uuid.uuid4().hex[:8]}" tool = decision.get("tool") args = decision.get("args") or {} if tool == "get_snapshot": result = handle_get_snapshot(args) elif tool == "get_anomalies": result = handle_get_anomalies(args) elif tool == "get_station_history": result = handle_get_station_history(args) else: result = tool_result_error(tool_id, tool, "INVALID_TOOL", "Tool not supported") result["id"] = tool_id result["tool"] = tool return result def build_answer( message, message_type, snapshot_json=None, tool_result=None, anomalies_brief=None, anomalies_mode=None, ): messages = [{"role": "system", "content": ANSWER_PROMPT}] req_type = (message_type or "unknown").upper() header = f"REQUEST_TYPE: {req_type}\nQUESTION: {message}" if anomalies_mode: header = f"{header}\nANOMALIES_MODE: {anomalies_mode}" messages.append( { "role": "user", "content": header, } ) if snapshot_json: messages.append( { "role": "user", "content": "SNAPSHOT_JSON:\n```json\n" + snapshot_json + "\n```", } ) if tool_result is not None: tool_json = json.dumps(tool_result, ensure_ascii=False) messages.append( { "role": "user", "content": "TOOL_RESULT:\n```json\n" + tool_json + "\n```", } ) if anomalies_brief is not None: brief_json = json.dumps(anomalies_brief, ensure_ascii=False) messages.append( { "role": "user", "content": "ANOMALIES_BRIEF:\n```json\n" + brief_json + "\n```", } ) return call_llm(messages, temperature=0.0, max_tokens=512) def build_trace_payload( message, request_type, decision, snapshot, snapshot_llm, tool_result, tool_result_llm, anomalies_brief=None, anomalies_mode=None, period=None, ): if request_type != "eco": return None llm_input = {} if anomalies_brief is not None: llm_input["anomalies_brief"] = anomalies_brief if anomalies_mode: llm_input["anomalies_mode"] = anomalies_mode else: if snapshot_llm is not None: llm_input["snapshot"] = snapshot_llm if tool_result_llm is not None: llm_input["tool_result"] = tool_result_llm raw_json = tool_result if tool_result is not None else snapshot return { "trace_id": str(uuid.uuid4()), "request_type": request_type, "question": message, "router": decision, "llm_input": llm_input if llm_input else None, "raw_json": raw_json, "period": period, } @app.get("/health") def health(): return {"ok": True} @app.get("/aip/waqi/summary") def waqi_summary( _auth: None = Depends(require_api_key), from_ts: str | None = Query(default=None, alias="from"), to_ts: str | None = Query(default=None, alias="to"), ): try: to_dt = parse_dt(to_ts) or datetime.now(timezone.utc) from_dt = parse_dt(from_ts) or (to_dt - timedelta(hours=24)) if from_dt > to_dt: raise HTTPException(status_code=400, detail="'from' must be <= 'to'") return fetch_summary(to_dt, from_dt, to_dt) except RuntimeError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc @app.get("/aip/waqi/anomalies") def waqi_anomalies( _auth: None = Depends(require_api_key), from_ts: str | None = Query(default=None, alias="from"), to_ts: str | None = Query(default=None, alias="to"), limit: int = Query(default=200, ge=1, le=1000), station_id: int | None = Query(default=None), anomaly_type: str | None = Query(default=None, alias="type"), ): try: to_dt = parse_dt(to_ts) or datetime.now(timezone.utc) from_dt = parse_dt(from_ts) or (to_dt - timedelta(hours=24)) if from_dt > to_dt: raise HTTPException(status_code=400, detail="'from' must be <= 'to'") return fetch_anomalies(from_dt, to_dt, limit, station_id, anomaly_type) except RuntimeError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc @app.get("/aip/waqi/stations") def waqi_stations( _auth: None = Depends(require_api_key), ): try: now = datetime.now(timezone.utc) return { "now": dt_iso(now), "windows": { "ingested_minutes": INGESTED_WINDOW_MINUTES, "observed_fresh_minutes": OBSERVED_FRESH_MINUTES, "observed_recent_hours": OBSERVED_RECENT_HOURS, }, "items": fetch_stations(now), } except RuntimeError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc @app.get("/aip/waqi/history") def waqi_history( _auth: None = Depends(require_api_key), date: str = Query(...), since: str | None = Query(default=None), ): try: day = parse_date(date) if day is None: raise HTTPException(status_code=400, detail="'date' is required") from_dt, to_dt = day_bounds_utc(day) since_dt = parse_dt(since) if since else None items = fetch_history(from_dt, to_dt, since_dt) return { "date": date, "range": {"from": dt_iso(from_dt), "to": dt_iso(to_dt)}, "items": items, } except RuntimeError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc @app.get("/aip/waqi/history_bounds") def waqi_history_bounds( _auth: None = Depends(require_api_key), ): try: min_ts, max_ts = fetch_history_bounds() return { "tz": UI_TZ, "min_date": local_date_key(min_ts), "max_date": local_date_key(max_ts), } except RuntimeError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc @app.get("/aip/waqi/report") def waqi_report(): return {} @app.post("/llm/chat") def llm_chat(payload: dict = Body(...)): message = str(payload.get("message") or "").strip() if not message: raise HTTPException(status_code=400, detail="'message' is required") mode = payload.get("mode") or "live" range_payload = payload.get("range") or {} from_raw = payload.get("from") or range_payload.get("from") to_raw = payload.get("to") or range_payload.get("to") try: message_type = classify_message(message) to_dt = parse_dt(to_raw) or datetime.now(timezone.utc) from_dt = parse_dt(from_raw) or (to_dt - timedelta(hours=24)) if from_dt > to_dt: raise HTTPException(status_code=400, detail="'from' must be <= 'to'") station_focus = payload.get("station_focus") or payload.get("station_id") if station_focus is not None: try: station_focus = int(station_focus) except (TypeError, ValueError): station_focus = None station_focus = station_focus or extract_station_id(message) now = datetime.now(timezone.utc) snapshot = build_snapshot( now, from_dt, to_dt, mode, station_focus, MAX_STATIONS_IN_SNAPSHOT, ) snapshot_llm = localize_llm_times(snapshot) snapshot_json = json.dumps(snapshot, ensure_ascii=False) snapshot_llm_json = json.dumps(snapshot_llm, ensure_ascii=False) decision = run_router(snapshot_json, message) anomalies_brief = None anomalies_mode = None effective_type = (decision.get("request_type") or message_type or "eco").lower() if effective_type in ("smalltalk", "meta"): decision = { "type": "no_tool", "reason": "no_tool_for_smalltalk_or_meta", "request_type": effective_type, } answer = build_answer(message, effective_type) payload = { "request_id": snapshot["request_id"], "router": decision, "tool_result": None, "answer": answer, "period": None, } return JSONResponse(payload, media_type="application/json; charset=utf-8") message_type = "eco" decision = apply_fallbacks(decision, message, snapshot, station_focus, message_type) decision["request_type"] = "eco" if decision["type"] == "tool_call_v1" and decision.get("tool") == "get_anomalies": args = decision.get("args") or {} if not isinstance(args, dict): args = {} if not args.get("range"): args["range"] = snapshot_range_args(snapshot) if "limit" not in args and "max_anomalies" in args: try: args["limit"] = int(args.get("max_anomalies")) except (TypeError, ValueError): pass decision["args"] = args decision = sanitize_tool_call(decision, station_focus, snapshot) tool_result = None if decision["type"] == "tool_call_v1": tool_result = execute_tool_call(decision) tool_result_llm = localize_llm_times(tool_result) if tool_result else None if tool_result and tool_result.get("ok") is False: answer = format_tool_error(tool_result) elif tool_result and tool_result.get("ok") and tool_result.get("tool") == "get_anomalies": items = (tool_result.get("data") or {}).get("items") or [] if not items: answer = ( "За выбранный период аномалий не найдено. " "Можно проверить 24 часа / 3 дня / другую станцию." ) else: brief = build_anomalies_brief(tool_result, top_n=MAX_ANOMALIES_BRIEF) anomalies_brief = brief if wants_anomaly_full_list(message): anomalies_mode = "expand" full_list = format_anomalies_full_list( tool_result, max_items=MAX_ANOMALIES_FULL_LIST, chrono=wants_anomaly_chrono(message), ) intro = build_answer( message, message_type, anomalies_brief=brief, anomalies_mode="expand", ) answer = f"{intro}\n{full_list}".strip() else: anomalies_mode = "brief" answer = build_answer( message, message_type, anomalies_brief=brief, anomalies_mode="brief", ) elif tool_result_is_no_data(tool_result): answer = "нет данных" else: answer = build_answer(message, message_type, snapshot_llm_json, tool_result_llm) answer = strip_iso_period_lines(answer) period = build_period_payload(snapshot, tool_result) trace_payload = build_trace_payload( message, "eco", decision, snapshot, snapshot_llm, tool_result, tool_result_llm, anomalies_brief=anomalies_brief, anomalies_mode=anomalies_mode, period=period, ) payload = { "request_id": snapshot["request_id"], "router": decision, "tool_result": tool_result, "answer": answer, "period": period, "trace_payload": trace_payload, } return JSONResponse(payload, media_type="application/json; charset=utf-8") except RuntimeError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc