NODEDC_1C/docs/ADDRESS/tz/1/eco-aip-backend/api/app/main.py

1842 lines
60 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import re
import uuid
from datetime import datetime, timedelta, timezone
import httpx
import psycopg2
from fastapi import Body, Depends, FastAPI, HTTPException, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
app = FastAPI()
DATABASE_URL = os.environ.get("DATABASE_URL")
API_KEY = os.environ.get("API_KEY", "").strip()
INGESTED_WINDOW_MINUTES = 30
OBSERVED_FRESH_MINUTES = 60
OBSERVED_RECENT_HOURS = 4
CORS_ORIGINS = [
origin.strip()
for origin in os.environ.get("CORS_ORIGINS", "http://localhost:5173").split(",")
if origin.strip()
]
LOCAL_TZ = timezone(timedelta(hours=3))
UI_TZ = "Europe/Moscow"
LLM_BASE_URL = os.environ.get("LLM_BASE_URL", "").rstrip("/")
LLM_MODEL = os.environ.get("LLM_MODEL", "saiga_nemo_12b_gguf")
LLM_TIMEOUT = float(os.environ.get("LLM_TIMEOUT", "60"))
LLM_MAX_STATIONS = int(os.environ.get("LLM_MAX_STATIONS", "30"))
MAX_TOOL_ITERATIONS = 3
MAX_RANGE_HOURS_SNAPSHOT = 72
MAX_RANGE_DAYS_ANOMALIES = 14
MAX_RANGE_DAYS_HISTORY = 31
MAX_STATIONS_IN_SNAPSHOT = LLM_MAX_STATIONS
MAX_ANOMALIES = 100
MAX_ANOMALIES_ANSWER = 10
MAX_ANOMALIES_BRIEF = 3
MAX_ANOMALIES_FULL_LIST = 30
MAX_HISTORY_POINTS = 500
MIN_HISTORY_STEP_SECONDS = 300
TOOLS_WHITELIST = ["get_snapshot", "get_anomalies", "get_station_history"]
REQUEST_TYPES = ["smalltalk", "meta", "eco"]
ANOMALY_TOPIC_TRIGGERS = [
"аномал",
"всплеск",
"скач",
"скачок",
"пик",
"пики",
"залип",
"застрял",
"подозр",
"резк",
"spike",
"stuck",
"outlier",
"flatline",
"stale",
"not updating",
]
ANOMALY_DETAIL_TRIGGERS = [
"перечисли",
"список",
"покажи все",
"подроб",
"подтверж",
"значени",
"дельта",
"по станциям",
"когда",
"таймстемп",
"evidence",
"timestamps",
"details",
"list",
"show all",
"where",
"which",
]
ANOMALY_FULL_LIST_TRIGGERS = [
"полный список",
"покажи все",
"перечисли все",
"все аномалии",
"все всплески",
"покажи полный",
"show all anomalies",
"full list",
]
ANOMALY_TIME_ORDER_TRIGGERS = [
"по времени",
"хронолог",
"по порядку",
"сначала",
"по хронологии",
"chronolog",
]
HISTORY_TRIGGERS = [
"истор",
"график",
"динамик",
"тренд",
"таймлайн",
"по часам",
"по минутам",
"за период",
"серия",
"кривая",
"trend",
"history",
"timeline",
"time series",
"series",
"chart",
"plot",
"over time",
]
SMALLTALK_TRIGGERS = [
"привет",
"здрав",
"добрый",
"как дела",
"спасибо",
"спс",
"ок",
"окей",
"оке",
"лол",
"тест",
"провер",
"работает",
"напиши фразу",
"напиши слово",
"угу",
"ага",
"ясно",
"понятно",
"ладно",
"алло",
"ты тут",
"hi",
"hello",
"thanks",
"ok",
"test",
"write a phrase",
"are you there",
"ping",
]
META_TRIGGERS = [
"почему ты",
"как работает",
"что такое",
"объясни",
"tool",
"snapshot",
"scope",
"evidence",
"prompt",
"model",
"llm",
"system",
"tools",
"оркестратор",
"router",
"answer",
"контекст",
"json",
"schema",
"endpoint",
"api",
"backend",
]
ECO_TRIGGERS = [
"aqi",
"air quality",
"воздух",
"качество воздуха",
"станц",
"измерен",
"свеж",
"данные",
"ingest",
"observed",
"fresh",
"recent",
"stale",
"pm2.5",
"pm10",
"anomal",
"station",
"history",
"timeline",
]
if CORS_ORIGINS:
allow_credentials = True
if CORS_ORIGINS == ["*"]:
allow_credentials = False
app.add_middleware(
CORSMiddleware,
allow_origins=CORS_ORIGINS,
allow_credentials=allow_credentials,
allow_methods=["*"],
allow_headers=["*"],
)
def require_db_url():
if not DATABASE_URL:
raise RuntimeError("DATABASE_URL is not set")
def require_api_key(request: Request):
if not API_KEY:
return
provided = request.headers.get("x-api-key") or ""
if provided != API_KEY:
raise HTTPException(status_code=401, detail="invalid api key")
def require_llm_base_url():
if not LLM_BASE_URL:
raise RuntimeError("LLM_BASE_URL is not set")
def text_has_any(text, keywords):
return any(keyword in text for keyword in keywords)
def classify_message(message):
text = (message or "").lower()
ecoish = (
text_has_any(text, ECO_TRIGGERS)
or text_has_any(text, ANOMALY_TOPIC_TRIGGERS)
or text_has_any(text, HISTORY_TRIGGERS)
)
if ecoish:
return "eco"
if text_has_any(text, META_TRIGGERS):
return "meta"
if text_has_any(text, SMALLTALK_TRIGGERS):
return "smalltalk"
text = text.strip()
if not text:
return "smalltalk"
return "eco"
def extract_station_id(text):
if not text:
return None
match = re.search(r"(?:station|станц)[^0-9]*(\d+)", text, re.IGNORECASE)
if not match:
return None
try:
return int(match.group(1))
except ValueError:
return None
def parse_dt(value):
if not value:
return None
value = value.strip()
if value.endswith("Z"):
value = value[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(value)
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Invalid datetime: {value}") from exc
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
ISO_LLM_DATETIME_RE = re.compile(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})?$"
)
def format_llm_timestamp(value):
if not isinstance(value, str) or not ISO_LLM_DATETIME_RE.match(value):
return value
try:
dt = parse_dt(value)
except HTTPException:
return value
local = dt.astimezone(LOCAL_TZ)
return local.strftime("%Y-%m-%d %H:%M")
def localize_llm_times(value):
if isinstance(value, dict):
return {key: localize_llm_times(val) for key, val in value.items()}
if isinstance(value, list):
return [localize_llm_times(item) for item in value]
return format_llm_timestamp(value)
def parse_date(value):
if not value:
return None
try:
return datetime.strptime(value, "%Y-%m-%d").date()
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Invalid date: {value}") from exc
def day_bounds_utc(day):
local_start = datetime(day.year, day.month, day.day, tzinfo=LOCAL_TZ)
local_end = local_start + timedelta(days=1)
return local_start.astimezone(timezone.utc), local_end.astimezone(timezone.utc)
def dt_iso(dt):
if dt is None:
return None
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
def local_date_key(dt):
if dt is None:
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(LOCAL_TZ).date().isoformat()
def build_station_snapshots(cur):
cur.execute(
"""
WITH last_ingested AS (
SELECT station_id, MAX(ingested_ts) AS last_ingested_ts
FROM measurements
GROUP BY station_id
),
last_observed AS (
SELECT DISTINCT ON (station_id) station_id, observed_ts AS last_observed_ts
FROM measurements
ORDER BY station_id, observed_ts DESC
),
last_aqi AS (
SELECT DISTINCT ON (station_id) station_id, observed_ts AS last_aqi_ts, aqi AS last_aqi
FROM measurements
WHERE aqi IS NOT NULL
ORDER BY station_id, observed_ts DESC
)
SELECT s.id,
s.provider_uid,
s.name,
s.lat,
s.lon,
li.last_ingested_ts,
lo.last_observed_ts,
la.last_aqi,
la.last_aqi_ts
FROM stations s
LEFT JOIN last_ingested li ON li.station_id = s.id
LEFT JOIN last_observed lo ON lo.station_id = s.id
LEFT JOIN last_aqi la ON la.station_id = s.id
ORDER BY s.id;
"""
)
return cur.fetchall()
def classify_station(now, last_ingested_ts, last_observed_ts):
ingested_cutoff = now - timedelta(minutes=INGESTED_WINDOW_MINUTES)
fresh_cutoff = now - timedelta(minutes=OBSERVED_FRESH_MINUTES)
recent_cutoff = now - timedelta(hours=OBSERVED_RECENT_HOURS)
in_flow = bool(last_ingested_ts and last_ingested_ts >= ingested_cutoff)
fresh = bool(in_flow and last_observed_ts and last_observed_ts >= fresh_cutoff)
recent = bool(
in_flow and last_observed_ts and recent_cutoff <= last_observed_ts < fresh_cutoff
)
stale = bool(in_flow and (not last_observed_ts or last_observed_ts < recent_cutoff))
return in_flow, fresh, recent, stale
def sort_stations_for_llm(stations):
def key(item):
aqi = item.get("last_aqi")
aqi_missing = aqi is None
observed = item.get("last_observed_ts")
observed_dt = parse_dt(observed) if observed else None
observed_ts = observed_dt.timestamp() if observed_dt else float("-inf")
station_id = item.get("station_id") or 0
return (
aqi_missing,
-(float(aqi) if aqi is not None else 0.0),
-observed_ts,
station_id,
)
return sorted(stations, key=key)
def select_llm_stations(stations):
ordered = sort_stations_for_llm(stations)
limited = ordered[:LLM_MAX_STATIONS]
total = len(ordered)
return {
"stations": limited,
"stations_total": total,
"stations_in_snapshot": len(limited),
"stations_truncated": total > len(limited),
"selection_rule": "aqi_desc_top_n",
}
def fetch_stations(now):
require_db_url()
items = []
with psycopg2.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
for (
station_id,
provider_uid,
name,
lat,
lon,
last_ingested_ts,
last_observed_ts,
last_aqi,
last_aqi_ts,
) in build_station_snapshots(cur):
in_flow, fresh, recent, stale = classify_station(
now, last_ingested_ts, last_observed_ts
)
items.append(
{
"station_id": station_id,
"provider_uid": provider_uid,
"name": name,
"lat": lat,
"lon": lon,
"last_aqi": last_aqi,
"last_aqi_ts": dt_iso(last_aqi_ts),
"last_observed_ts": dt_iso(last_observed_ts),
"last_ingested_ts": dt_iso(last_ingested_ts),
"in_flow": in_flow,
"fresh": fresh,
"recent": recent,
"stale": stale,
}
)
return items
def fetch_summary(now, from_ts, to_ts):
require_db_url()
stations_total = 0
inflow_count = 0
fresh_count = 0
recent_count = 0
stale_count = 0
last_ingested = None
last_observed = None
aqi_values = []
with psycopg2.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM stations;")
stations_total = cur.fetchone()[0]
for (
_station_id,
_provider_uid,
_name,
_lat,
_lon,
last_ingested_ts,
last_observed_ts,
last_aqi,
_last_aqi_ts,
) in build_station_snapshots(cur):
if last_ingested_ts and (last_ingested is None or last_ingested_ts > last_ingested):
last_ingested = last_ingested_ts
if last_observed_ts and (last_observed is None or last_observed_ts > last_observed):
last_observed = last_observed_ts
in_flow, fresh, recent, stale = classify_station(
now, last_ingested_ts, last_observed_ts
)
if not in_flow:
continue
inflow_count += 1
if fresh:
fresh_count += 1
elif recent:
recent_count += 1
else:
stale_count += 1
if last_aqi is not None:
aqi_values.append(last_aqi)
avg_now = None
max_now = None
if aqi_values:
avg_now = sum(aqi_values) / len(aqi_values)
max_now = max(aqi_values)
return {
"range": {"from": dt_iso(from_ts), "to": dt_iso(to_ts)},
"windows": {
"ingested_minutes": INGESTED_WINDOW_MINUTES,
"observed_fresh_minutes": OBSERVED_FRESH_MINUTES,
"observed_recent_hours": OBSERVED_RECENT_HOURS,
},
"stations_total": stations_total,
"in_flow": inflow_count,
"fresh": fresh_count,
"recent": recent_count,
"stale": stale_count,
"aqi_now": {
"avg": avg_now,
"max": max_now,
"count": len(aqi_values),
},
"last_ingested_ts": dt_iso(last_ingested),
"last_observed_ts": dt_iso(last_observed),
"now": dt_iso(now),
}
def fetch_anomalies_summary(from_ts, to_ts, anomaly_types=None, station_id=None):
require_db_url()
params = [from_ts, to_ts]
where = ["a.ts_start >= %s", "a.ts_start <= %s"]
if station_id is not None:
where.append("a.station_id = %s")
params.append(station_id)
types = None
if anomaly_types:
if isinstance(anomaly_types, str):
types = [anomaly_types]
else:
types = list(anomaly_types)
if types:
where.append("a.type = ANY(%s)")
params.append(types)
where_sql = " AND ".join(where)
with psycopg2.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
cur.execute(
f"""
SELECT COUNT(*)
FROM anomalies a
WHERE {where_sql};
""",
params,
)
count = cur.fetchone()[0]
return {"has_anomalies": count > 0, "count": count}
def build_snapshot(now, from_ts, to_ts, mode, station_focus=None, max_stations=None):
summary = fetch_summary(now, from_ts, to_ts)
stations = fetch_stations(now)
selection = select_llm_stations(stations)
if max_stations and max_stations < selection["stations_in_snapshot"]:
selection["stations"] = selection["stations"][:max_stations]
selection["stations_in_snapshot"] = len(selection["stations"])
selection["stations_truncated"] = selection["stations_total"] > selection["stations_in_snapshot"]
anomalies_summary = fetch_anomalies_summary(
from_ts,
to_ts,
anomaly_types=["spike", "stuck"],
station_id=station_focus,
)
return {
"type": "snapshot_v1",
"request_id": str(uuid.uuid4()),
"generated_at": dt_iso(now),
"ui": {
"mode": mode,
"range": {
"from": dt_iso(from_ts),
"to": dt_iso(to_ts),
"tz": UI_TZ,
},
"station_focus": station_focus,
},
"limits": {
"max_tool_iterations": MAX_TOOL_ITERATIONS,
"max_range_hours_snapshot": MAX_RANGE_HOURS_SNAPSHOT,
"max_range_days_anomalies": MAX_RANGE_DAYS_ANOMALIES,
"max_range_days_history": MAX_RANGE_DAYS_HISTORY,
"max_stations_in_snapshot": MAX_STATIONS_IN_SNAPSHOT,
"max_anomalies": MAX_ANOMALIES,
"max_history_points": MAX_HISTORY_POINTS,
"min_history_step_seconds": MIN_HISTORY_STEP_SECONDS,
},
"data": {
"summary": summary,
"stations": selection["stations"],
"anomalies_summary": anomalies_summary,
"stations_in_snapshot": selection["stations_in_snapshot"],
"stations_truncated": selection["stations_truncated"],
"selection_rule": selection["selection_rule"],
},
"tools_whitelist": TOOLS_WHITELIST,
}
def normalize_json(value):
if value is None or isinstance(value, (dict, list)):
return value
if isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
return value
return value
def fetch_anomalies(from_ts, to_ts, limit, station_id, anomaly_types):
require_db_url()
params = [from_ts, to_ts]
where = ["a.ts_start >= %s", "a.ts_start <= %s"]
if station_id is not None:
where.append("a.station_id = %s")
params.append(station_id)
types = None
if anomaly_types:
if isinstance(anomaly_types, str):
types = [anomaly_types]
else:
types = list(anomaly_types)
if types:
where.append("a.type = ANY(%s)")
params.append(types)
params.append(limit)
where_sql = " AND ".join(where)
with psycopg2.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
cur.execute(
f"""
SELECT a.id, a.station_id, s.name, a.type, a.severity, a.confidence,
a.ts_start, a.ts_end, a.metric, a.evidence, a.created_at
FROM anomalies a
LEFT JOIN stations s ON s.id = a.station_id
WHERE {where_sql}
ORDER BY a.ts_start DESC
LIMIT %s;
""",
params,
)
rows = cur.fetchall()
items = []
for (
anomaly_id,
row_station_id,
station_name,
row_type,
severity,
confidence,
ts_start,
ts_end,
metric,
evidence,
created_at,
) in rows:
items.append(
{
"id": anomaly_id,
"station_id": row_station_id,
"station_name": station_name,
"type": row_type,
"severity": severity,
"confidence": confidence,
"ts_start": dt_iso(ts_start),
"ts_end": dt_iso(ts_end),
"metric": metric,
"evidence": normalize_json(evidence),
"created_at": dt_iso(created_at),
}
)
return {
"range": {"from": dt_iso(from_ts), "to": dt_iso(to_ts)},
"items": items,
}
def fetch_history(from_ts, to_ts, since_ts):
require_db_url()
params = [from_ts, to_ts]
conditions = ["m.observed_ts >= %s", "m.observed_ts < %s"]
if since_ts and since_ts > from_ts:
conditions.append("m.observed_ts > %s")
params.append(since_ts)
where_sql = " AND ".join(conditions)
with psycopg2.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
cur.execute(
f"""
SELECT m.station_id,
s.provider_uid,
s.name,
s.lat,
s.lon,
m.observed_ts,
m.ingested_ts,
m.aqi
FROM measurements m
JOIN stations s ON s.id = m.station_id
WHERE {where_sql}
ORDER BY m.observed_ts ASC, m.station_id ASC;
""",
params,
)
rows = cur.fetchall()
items = []
for (
station_id,
provider_uid,
name,
lat,
lon,
observed_ts,
ingested_ts,
aqi,
) in rows:
items.append(
{
"station_id": station_id,
"provider_uid": provider_uid,
"name": name,
"lat": lat,
"lon": lon,
"observed_ts": dt_iso(observed_ts),
"ingested_ts": dt_iso(ingested_ts),
"aqi": aqi,
}
)
return items
def fetch_history_bounds():
require_db_url()
with psycopg2.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT MIN(observed_ts), MAX(observed_ts)
FROM measurements;
"""
)
row = cur.fetchone()
if not row:
return None, None
return row[0], row[1]
def compute_history_step(from_ts, to_ts, requested_step_seconds):
range_seconds = max((to_ts - from_ts).total_seconds(), 1)
step_seconds = requested_step_seconds or MIN_HISTORY_STEP_SECONDS
step_seconds = max(step_seconds, MIN_HISTORY_STEP_SECONDS)
min_for_points = int(range_seconds / MAX_HISTORY_POINTS) + 1
if min_for_points > step_seconds:
step_seconds = min_for_points
return step_seconds
def fetch_station_history(station_id, from_ts, to_ts, step_seconds):
require_db_url()
with psycopg2.connect(DATABASE_URL) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT observed_ts, aqi
FROM measurements
WHERE station_id = %s
AND observed_ts >= %s
AND observed_ts <= %s
ORDER BY observed_ts ASC;
""",
[station_id, from_ts, to_ts],
)
rows = cur.fetchall()
step_seconds = compute_history_step(from_ts, to_ts, step_seconds)
buckets = {}
for observed_ts, aqi in rows:
index = int((observed_ts - from_ts).total_seconds() // step_seconds)
bucket_ts = from_ts + timedelta(seconds=index * step_seconds)
bucket = buckets.setdefault(bucket_ts, {"sum": 0.0, "count": 0})
if aqi is not None:
bucket["sum"] += float(aqi)
bucket["count"] += 1
points = []
for bucket_ts in sorted(buckets):
count = buckets[bucket_ts]["count"]
if count <= 0:
continue
points.append(
{
"ts": dt_iso(bucket_ts),
"aqi": buckets[bucket_ts]["sum"] / count,
}
)
return {
"station_id": station_id,
"range": {"from": dt_iso(from_ts), "to": dt_iso(to_ts)},
"step_seconds": step_seconds,
"points": points,
}
ROUTER_PROMPT = (
"Ты — Router AIP Ecology. Верни ТОЛЬКО JSON по схеме router_decision_v1.\n"
"Всегда указывай request_type: smalltalk | meta | eco.\n"
"\n"
"Определение request_type:\n"
"- smalltalk: привет/как дела/спасибо/ок/анекдот/тест/поболтать.\n"
"- meta: вопросы «почему ты», «при чем тут», «ты о чем», вопросы о системе/промптах/tools/snapshot.\n"
"- eco: вопросы про воздух/AQI/станции/аномалии/историю/диапазон/свежесть.\n"
"\n"
"Правила:\n"
"- Если request_type = smalltalk или meta -> type = no_tool.\n"
"- Если request_type = eco и snapshot_v1 достаточно -> type = no_tool, reason = snapshot_enough.\n"
"- Если request_type = eco и нужны детали -> type = tool_call_v1.\n"
"Инструменты:\n"
"- get_anomalies: списки/подтверждение аномалий.\n"
"- get_station_history: таймсерия по станции.\n"
"- get_snapshot: только если нужен другой диапазон/режим или snapshot отсутствует.\n"
"Без лишнего текста."
)
ANSWER_PROMPT = (
"Ты — аналитик AIP Ecology.\n"
"REQUEST_TYPE уже определён на backend. Не переклассифицируй.\n"
"\n"
"Правила по REQUEST_TYPE:\n"
"- SMALLTALK/TEST: ответь кратко (1-2 предложения) на языке пользователя. "
"НЕ упоминай snapshot_v1, tool_result_v1, качество воздуха/AQI, мониторинг, станции, аномалии, диапазоны, временные метки, "
"\"нет данных\", ingestion/observation или аналитику.\n"
"- META: кратко объясни на языке пользователя. "
"Не анализируй качество воздуха и не упоминай конкретные станции, если явно не спросили.\n"
"- ECO: отвечай ТОЛЬКО по SNAPSHOT_JSON и TOOL_RESULT. Не выдумывай.\n"
"\n"
"Правила ECO:\n"
"- Не добавляй строку вида \"Период: ...\" — период показывает UI.\n"
"- Упоминай аномалии ТОЛЬКО если пользователь спросил про аномалии/риски ИЛИ TOOL_RESULT.tool == get_anomalies.\n"
"- Если TOOL_RESULT есть и ok=false: ответь кратко: \"ошибка инструмента: <code> — <message>\".\n"
"- Если TOOL_RESULT items/points пустые ИЛИ error.code == NO_DATA: ответ строго: \"нет данных\".\n"
"- Если есть ANOMALIES_BRIEF: используй только его факты и числа, не выводи ISO/JSON.\n"
" Обычно достаточно 12 связных предложения, затем 25 коротких строк с примерами.\n"
"\n"
"Стиль: деловой и лаконичный. Эмодзи допустимы, но умеренно. Без лишних вступлений и без пустых фраз.\n"
"Отвечай на языке пользователя."
)
def router_response_format():
return {
"type": "json_schema",
"json_schema": {
"name": "router_decision_v1",
"strict": True,
"schema": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "router_decision_v1",
"type": "object",
"oneOf": [
{
"type": "object",
"required": ["type", "request_type", "id", "tool", "args"],
"properties": {
"type": {"const": "tool_call_v1"},
"request_type": {
"type": "string",
"enum": REQUEST_TYPES,
},
"id": {"type": "string", "minLength": 1},
"tool": {
"type": "string",
"enum": TOOLS_WHITELIST,
},
"args": {"type": "object"},
},
"additionalProperties": False,
},
{
"type": "object",
"required": ["type", "request_type"],
"properties": {
"type": {"const": "no_tool"},
"request_type": {
"type": "string",
"enum": REQUEST_TYPES,
},
"reason": {"type": "string"},
},
"additionalProperties": False,
},
],
},
},
}
def call_llm(messages, temperature, max_tokens, response_format=None):
require_llm_base_url()
payload = {
"model": LLM_MODEL,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
}
if response_format:
payload["response_format"] = response_format
try:
with httpx.Client(timeout=LLM_TIMEOUT) as client:
resp = client.post(f"{LLM_BASE_URL}/v1/chat/completions", json=payload)
except httpx.RequestError as exc:
raise HTTPException(status_code=502, detail=f"LLM request failed: {exc}") from exc
if resp.status_code >= 400:
raise HTTPException(
status_code=502,
detail=f"LLM error {resp.status_code}: {resp.text}",
)
try:
raw_text = resp.content.decode("utf-8")
except UnicodeDecodeError:
raw_text = resp.content.decode("utf-8", errors="replace")
try:
data = json.loads(raw_text)
except json.JSONDecodeError as exc:
raise HTTPException(status_code=502, detail="LLM response invalid JSON") from exc
try:
return data["choices"][0]["message"]["content"]
except (KeyError, IndexError, TypeError) as exc:
raise HTTPException(status_code=502, detail="LLM response invalid") from exc
def parse_json_content(text):
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
start = text.find("{")
end = text.rfind("}")
if start == -1 or end == -1 or end <= start:
return None
try:
return json.loads(text[start : end + 1])
except json.JSONDecodeError:
return None
def normalize_request_type(value, reason, message):
if isinstance(value, str):
value = value.strip().lower()
if value in REQUEST_TYPES:
return value
reason_text = (reason or "").lower()
if "smalltalk" in reason_text:
return "smalltalk"
if "meta" in reason_text:
return "meta"
fallback = classify_message(message) if message else "eco"
if fallback in REQUEST_TYPES:
return fallback
return "eco"
def normalize_router_decision(decision, message):
if not isinstance(decision, dict):
return {
"type": "no_tool",
"reason": "router_invalid_json",
"request_type": normalize_request_type(None, None, message),
}
decision_type = decision.get("type")
reason = decision.get("reason")
request_type = normalize_request_type(decision.get("request_type"), reason, message)
if decision_type == "tool_call_v1":
tool = decision.get("tool")
args = decision.get("args")
if tool not in TOOLS_WHITELIST or not isinstance(args, dict):
return {
"type": "no_tool",
"reason": "router_invalid_tool",
"request_type": request_type,
}
tool_id = decision.get("id") or f"call_{uuid.uuid4().hex[:8]}"
return {
"type": "tool_call_v1",
"id": tool_id,
"tool": tool,
"args": args,
"request_type": request_type,
}
if decision_type == "no_tool":
return {
"type": "no_tool",
"reason": reason or "snapshot_enough",
"request_type": request_type,
}
return {
"type": "no_tool",
"reason": "router_invalid_json",
"request_type": request_type,
}
def run_router(snapshot_json, message):
messages = [
{"role": "system", "content": ROUTER_PROMPT},
{"role": "user", "content": f"SNAPSHOT_JSON: {snapshot_json}"},
{"role": "user", "content": f"QUESTION: {message}"},
]
content = call_llm(
messages,
temperature=0,
max_tokens=200,
response_format=router_response_format(),
)
decision = parse_json_content(content)
if decision is None:
repair_messages = [
{
"role": "system",
"content": "Return only JSON that matches the schema. Include request_type. No extra text.",
},
{"role": "user", "content": content},
]
repair_content = call_llm(
repair_messages,
temperature=0,
max_tokens=200,
response_format=router_response_format(),
)
decision = parse_json_content(repair_content)
return normalize_router_decision(decision, message)
def snapshot_range_args(snapshot):
ui_range = snapshot["ui"]["range"]
return {
"from": ui_range["from"],
"to": ui_range["to"],
"tz": ui_range.get("tz") or UI_TZ,
}
def build_period_payload(snapshot, tool_result=None):
range_data = None
if tool_result and tool_result.get("ok"):
data = tool_result.get("data") or {}
range_data = data.get("range")
if not range_data:
range_data = snapshot.get("ui", {}).get("range") or {}
from_ts = range_data.get("from")
to_ts = range_data.get("to")
if not from_ts or not to_ts:
return None
return {
"from": from_ts,
"to": to_ts,
"tz": range_data.get("tz") or UI_TZ,
}
def wants_anomaly_details(message):
text = (message or "").lower()
if not text_has_any(text, ANOMALY_TOPIC_TRIGGERS):
return False
return text_has_any(text, ANOMALY_DETAIL_TRIGGERS)
def wants_anomaly_full_list(message):
text = (message or "").lower()
if not text_has_any(text, ANOMALY_TOPIC_TRIGGERS):
return False
return text_has_any(text, ANOMALY_FULL_LIST_TRIGGERS)
def wants_anomaly_chrono(message):
text = (message or "").lower()
return text_has_any(text, ANOMALY_TIME_ORDER_TRIGGERS)
def wants_history(message):
text = (message or "").lower()
return text_has_any(text, HISTORY_TRIGGERS)
def apply_fallbacks(decision, message, snapshot, station_focus, message_type):
if message_type != "eco":
return decision
if decision["type"] != "no_tool":
return decision
station_id = station_focus or extract_station_id(message)
if wants_anomaly_details(message):
return {
"type": "tool_call_v1",
"id": f"call_{uuid.uuid4().hex[:8]}",
"tool": "get_anomalies",
"args": {
"range": snapshot_range_args(snapshot),
"types": ["spike", "stuck"],
"limit": MAX_ANOMALIES,
},
}
if station_id and wants_history(message):
return {
"type": "tool_call_v1",
"id": f"call_{uuid.uuid4().hex[:8]}",
"tool": "get_station_history",
"args": {
"station_id": station_id,
"range": snapshot_range_args(snapshot),
"step_seconds": MIN_HISTORY_STEP_SECONDS,
},
}
return decision
def sanitize_tool_call(decision, station_focus, snapshot):
if decision.get("type") != "tool_call_v1":
return decision
tool = decision.get("tool")
args = decision.get("args") or {}
if not isinstance(args, dict):
args = {}
if tool == "get_station_history":
station_id = args.get("station_id")
if station_id is None and station_focus is not None:
station_id = station_focus
if station_id is None:
return {
"type": "no_tool",
"reason": "history_requires_station_id",
"request_type": decision.get("request_type") or "eco",
}
try:
station_id = int(station_id)
except (TypeError, ValueError):
return {
"type": "no_tool",
"reason": "history_invalid_station_id",
"request_type": decision.get("request_type") or "eco",
}
args["station_id"] = station_id
if not args.get("range"):
args["range"] = snapshot_range_args(snapshot)
decision["args"] = args
return decision
if tool == "get_anomalies":
if not args.get("range"):
args["range"] = snapshot_range_args(snapshot)
if "limit" not in args and "max_anomalies" in args:
try:
args["limit"] = int(args.get("max_anomalies"))
except (TypeError, ValueError):
pass
decision["args"] = args
return decision
def tool_result_is_no_data(tool_result):
if not tool_result:
return False
if tool_result.get("ok") is False:
err = tool_result.get("error") or {}
code = str(err.get("code") or "").upper()
return code == "NO_DATA"
data = tool_result.get("data") or {}
tool = tool_result.get("tool")
if tool == "get_anomalies":
return False
if tool == "get_station_history" and not data.get("points"):
return True
return False
def format_tool_error(tool_result):
err = (tool_result or {}).get("error") or {}
code = err.get("code") or "ERROR"
message = err.get("message") or ""
if message:
return f"ошибка инструмента: {code}{message}"
return f"ошибка инструмента: {code}"
def strip_iso_period_lines(text):
if not text:
return text
iso_ts = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(?::\d{2})?(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})?"
pattern = re.compile(rf"^\s*(?:Период|Period)\s*:\s*{iso_ts}\s*[-]\s*{iso_ts}\s*$")
lines = str(text).replace("\r", "").split("\n")
kept = [line for line in lines if not pattern.match(line.strip())]
cleaned = "\n".join(kept).strip()
return cleaned or text
def clean_station_name(name, station_id=None):
if not name:
return f"station {station_id}" if station_id is not None else "station"
return name.split(" (", 1)[0]
def parse_local_dt(value):
if not value:
return None
try:
dt = parse_dt(value)
except HTTPException:
return None
return dt.astimezone(LOCAL_TZ)
def format_time_local(dt):
return dt.strftime("%H:%M") if dt else None
def format_date_local(dt):
return dt.strftime("%d.%m") if dt else None
def is_utc_day_range(from_ts, to_ts):
if not from_ts or not to_ts:
return False
if from_ts[:10] != to_ts[:10]:
return False
start_ok = bool(re.search(r"T00:00(?::00(?:\\.\\d+)?)?(?:Z|[+-]\\d{2}:\\d{2})?$", from_ts))
end_ok = bool(re.search(r"T23:59(?::59(?:\\.\\d+)?)?(?:Z|[+-]\\d{2}:\\d{2})?$", to_ts))
return start_ok and end_ok
def range_span_days(range_info):
from_dt = parse_local_dt(range_info.get("from"))
to_dt = parse_local_dt(range_info.get("to"))
if not from_dt or not to_dt:
return False
return from_dt.date() != to_dt.date()
def build_period_label(range_info):
from_ts = range_info.get("from")
to_ts = range_info.get("to")
if is_utc_day_range(from_ts, to_ts):
try:
day = datetime.strptime(from_ts[:10], "%Y-%m-%d")
except ValueError:
return None
return day.strftime("%d.%m")
from_dt = parse_local_dt(from_ts)
to_dt = parse_local_dt(to_ts)
if from_dt and to_dt and from_dt.date() == to_dt.date():
return format_date_local(from_dt)
return None
def dedupe_anomalies(items):
seen = set()
cleaned = []
for item in items:
evidence = item.get("evidence") or {}
key = (
item.get("station_id"),
item.get("type"),
item.get("ts_start"),
item.get("ts_end"),
evidence.get("delta"),
)
if key in seen:
continue
seen.add(key)
cleaned.append(item)
return cleaned
def anomaly_key(item):
evidence = item.get("evidence") or {}
return (
item.get("station_id"),
item.get("type"),
item.get("ts_start"),
item.get("ts_end"),
evidence.get("delta"),
)
def anomaly_delta(item):
evidence = item.get("evidence") or {}
delta = evidence.get("delta")
if isinstance(delta, (int, float)):
return delta
return -1
def anomaly_time_label(item, include_date):
dt = parse_local_dt(item.get("ts_start"))
if not dt:
return None
time_label = format_time_local(dt)
if include_date:
date_label = format_date_local(dt)
if date_label and time_label:
return f"{date_label} {time_label}"
return date_label or time_label
return time_label
def build_anomalies_brief(tool_result, top_n=MAX_ANOMALIES_BRIEF):
data = (tool_result or {}).get("data") or {}
items = data.get("items") or []
cleaned = dedupe_anomalies(items)
range_info = data.get("range") or {}
include_date = range_span_days(range_info)
period_label = build_period_label(range_info)
stations = set()
for item in cleaned:
station_id = item.get("station_id")
if station_id is not None:
stations.add(station_id)
continue
station_name = item.get("station_name")
if station_name:
stations.add(station_name)
stations_count = len(stations) if stations else None
sorted_by_delta = sorted(cleaned, key=anomaly_delta, reverse=True)
strongest_item = sorted_by_delta[0] if sorted_by_delta else None
strongest_key = anomaly_key(strongest_item) if strongest_item else None
def to_brief(item):
evidence = item.get("evidence") or {}
return {
"station_name": clean_station_name(item.get("station_name"), item.get("station_id")),
"time_msk": anomaly_time_label(item, include_date),
"prev": evidence.get("prev_aqi"),
"curr": evidence.get("curr_aqi"),
"delta": evidence.get("delta"),
"kind": item.get("type"),
}
strongest = to_brief(strongest_item) if strongest_item else None
top_items = []
for item in sorted_by_delta:
if strongest_key and anomaly_key(item) == strongest_key:
continue
top_items.append(to_brief(item))
if len(top_items) >= top_n:
break
return {
"count": len(cleaned),
"stations_count": stations_count,
"period_label": period_label,
"strongest": strongest,
"top_items": top_items,
}
def format_anomalies_full_list(tool_result, max_items=MAX_ANOMALIES_FULL_LIST, chrono=False):
data = (tool_result or {}).get("data") or {}
items = data.get("items") or []
if not items:
return (
"За выбранный период аномалий не найдено. "
"Можно проверить 24 часа / 3 дня / другую станцию."
)
cleaned = dedupe_anomalies(items)
range_info = data.get("range") or {}
include_date = range_span_days(range_info)
if chrono:
cleaned.sort(key=lambda x: x.get("ts_start") or "")
else:
cleaned.sort(key=anomaly_delta, reverse=True)
limit = min(len(cleaned), max_items)
lines = []
for item in cleaned[:limit]:
evidence = item.get("evidence") or {}
station = clean_station_name(item.get("station_name"), item.get("station_id"))
time_label = anomaly_time_label(item, include_date)
prev = evidence.get("prev_aqi")
curr = evidence.get("curr_aqi")
delta = evidence.get("delta")
parts = [station]
if time_label:
parts.append(time_label)
line = "".join(parts)
if prev is not None and curr is not None and delta is not None:
line = f"{line}{prev}{curr} (+{delta})"
elif delta is not None:
line = f"{line} — Δ {delta}"
lines.append(line)
if len(cleaned) > limit:
lines.append(
f"Показала первые {limit}. Можно сузить период или выбрать станцию."
)
return "\n".join(lines)
def validate_range(from_ts, to_ts, max_hours=None, max_days=None):
if from_ts is None or to_ts is None:
return "INVALID_RANGE", "Range is required"
if from_ts > to_ts:
return "INVALID_RANGE", "'from' must be <= 'to'"
range_seconds = (to_ts - from_ts).total_seconds()
if max_hours and range_seconds > max_hours * 3600:
return "RANGE_TOO_LARGE", f"Range exceeds {max_hours} hours"
if max_days and range_seconds > max_days * 86400:
return "RANGE_TOO_LARGE", f"Range exceeds {max_days} days"
return None, None
def tool_result_ok(tool_id, tool, data):
return {
"type": "tool_result_v1",
"id": tool_id,
"tool": tool,
"ok": True,
"data": data,
}
def tool_result_error(tool_id, tool, code, message):
return {
"type": "tool_result_v1",
"id": tool_id,
"tool": tool,
"ok": False,
"error": {"code": code, "message": message},
}
def handle_get_snapshot(args):
range_args = (args or {}).get("range")
if not range_args:
return tool_result_error("missing", "get_snapshot", "INVALID_RANGE", "Range is required")
from_ts = parse_dt(range_args.get("from"))
to_ts = parse_dt(range_args.get("to"))
code, message = validate_range(from_ts, to_ts, max_hours=MAX_RANGE_HOURS_SNAPSHOT)
if code:
return tool_result_error("missing", "get_snapshot", code, message)
limits = (args or {}).get("limits") or {}
stations_limit = limits.get("stations") or MAX_STATIONS_IN_SNAPSHOT
stations_limit = min(int(stations_limit), MAX_STATIONS_IN_SNAPSHOT)
now = datetime.now(timezone.utc)
summary = fetch_summary(now, from_ts, to_ts)
selection = select_llm_stations(fetch_stations(now))
stations = selection["stations"][:stations_limit]
anomalies_summary = fetch_anomalies_summary(
from_ts, to_ts, anomaly_types=["spike", "stuck"]
)
data = {
"range": {"from": dt_iso(from_ts), "to": dt_iso(to_ts)},
"summary": summary,
"stations": stations,
"anomalies_summary": anomalies_summary,
"stations_in_snapshot": len(stations),
"stations_truncated": selection["stations_total"] > len(stations),
"selection_rule": selection["selection_rule"],
}
return tool_result_ok("missing", "get_snapshot", data)
def handle_get_anomalies(args):
range_args = (args or {}).get("range")
if not range_args:
return tool_result_error("missing", "get_anomalies", "INVALID_RANGE", "Range is required")
from_ts = parse_dt(range_args.get("from"))
to_ts = parse_dt(range_args.get("to"))
code, message = validate_range(from_ts, to_ts, max_days=MAX_RANGE_DAYS_ANOMALIES)
if code:
return tool_result_error("missing", "get_anomalies", code, message)
types = (args or {}).get("types") or ["spike", "stuck"]
allowed_types = [t for t in types if t in ("spike", "stuck")]
if not allowed_types:
allowed_types = ["spike", "stuck"]
limit = int((args or {}).get("limit") or MAX_ANOMALIES)
limit = min(limit, MAX_ANOMALIES)
data = fetch_anomalies(from_ts, to_ts, limit, None, allowed_types)
return tool_result_ok("missing", "get_anomalies", data)
def handle_get_station_history(args):
range_args = (args or {}).get("range")
if not range_args:
return tool_result_error(
"missing", "get_station_history", "INVALID_RANGE", "Range is required"
)
from_ts = parse_dt(range_args.get("from"))
to_ts = parse_dt(range_args.get("to"))
code, message = validate_range(from_ts, to_ts, max_days=MAX_RANGE_DAYS_HISTORY)
if code:
return tool_result_error("missing", "get_station_history", code, message)
station_id = (args or {}).get("station_id")
if station_id is None:
return tool_result_error(
"missing", "get_station_history", "INVALID_ARGS", "station_id is required"
)
step_seconds = (args or {}).get("step_seconds") or MIN_HISTORY_STEP_SECONDS
data = fetch_station_history(int(station_id), from_ts, to_ts, int(step_seconds))
return tool_result_ok("missing", "get_station_history", data)
def execute_tool_call(decision):
tool_id = decision.get("id") or f"call_{uuid.uuid4().hex[:8]}"
tool = decision.get("tool")
args = decision.get("args") or {}
if tool == "get_snapshot":
result = handle_get_snapshot(args)
elif tool == "get_anomalies":
result = handle_get_anomalies(args)
elif tool == "get_station_history":
result = handle_get_station_history(args)
else:
result = tool_result_error(tool_id, tool, "INVALID_TOOL", "Tool not supported")
result["id"] = tool_id
result["tool"] = tool
return result
def build_answer(
message,
message_type,
snapshot_json=None,
tool_result=None,
anomalies_brief=None,
anomalies_mode=None,
):
messages = [{"role": "system", "content": ANSWER_PROMPT}]
req_type = (message_type or "unknown").upper()
header = f"REQUEST_TYPE: {req_type}\nQUESTION: {message}"
if anomalies_mode:
header = f"{header}\nANOMALIES_MODE: {anomalies_mode}"
messages.append(
{
"role": "user",
"content": header,
}
)
if snapshot_json:
messages.append(
{
"role": "user",
"content": "SNAPSHOT_JSON:\n```json\n" + snapshot_json + "\n```",
}
)
if tool_result is not None:
tool_json = json.dumps(tool_result, ensure_ascii=False)
messages.append(
{
"role": "user",
"content": "TOOL_RESULT:\n```json\n" + tool_json + "\n```",
}
)
if anomalies_brief is not None:
brief_json = json.dumps(anomalies_brief, ensure_ascii=False)
messages.append(
{
"role": "user",
"content": "ANOMALIES_BRIEF:\n```json\n" + brief_json + "\n```",
}
)
return call_llm(messages, temperature=0.0, max_tokens=512)
def build_trace_payload(
message,
request_type,
decision,
snapshot,
snapshot_llm,
tool_result,
tool_result_llm,
anomalies_brief=None,
anomalies_mode=None,
period=None,
):
if request_type != "eco":
return None
llm_input = {}
if anomalies_brief is not None:
llm_input["anomalies_brief"] = anomalies_brief
if anomalies_mode:
llm_input["anomalies_mode"] = anomalies_mode
else:
if snapshot_llm is not None:
llm_input["snapshot"] = snapshot_llm
if tool_result_llm is not None:
llm_input["tool_result"] = tool_result_llm
raw_json = tool_result if tool_result is not None else snapshot
return {
"trace_id": str(uuid.uuid4()),
"request_type": request_type,
"question": message,
"router": decision,
"llm_input": llm_input if llm_input else None,
"raw_json": raw_json,
"period": period,
}
@app.get("/health")
def health():
return {"ok": True}
@app.get("/aip/waqi/summary")
def waqi_summary(
_auth: None = Depends(require_api_key),
from_ts: str | None = Query(default=None, alias="from"),
to_ts: str | None = Query(default=None, alias="to"),
):
try:
to_dt = parse_dt(to_ts) or datetime.now(timezone.utc)
from_dt = parse_dt(from_ts) or (to_dt - timedelta(hours=24))
if from_dt > to_dt:
raise HTTPException(status_code=400, detail="'from' must be <= 'to'")
return fetch_summary(to_dt, from_dt, to_dt)
except RuntimeError as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
@app.get("/aip/waqi/anomalies")
def waqi_anomalies(
_auth: None = Depends(require_api_key),
from_ts: str | None = Query(default=None, alias="from"),
to_ts: str | None = Query(default=None, alias="to"),
limit: int = Query(default=200, ge=1, le=1000),
station_id: int | None = Query(default=None),
anomaly_type: str | None = Query(default=None, alias="type"),
):
try:
to_dt = parse_dt(to_ts) or datetime.now(timezone.utc)
from_dt = parse_dt(from_ts) or (to_dt - timedelta(hours=24))
if from_dt > to_dt:
raise HTTPException(status_code=400, detail="'from' must be <= 'to'")
return fetch_anomalies(from_dt, to_dt, limit, station_id, anomaly_type)
except RuntimeError as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
@app.get("/aip/waqi/stations")
def waqi_stations(
_auth: None = Depends(require_api_key),
):
try:
now = datetime.now(timezone.utc)
return {
"now": dt_iso(now),
"windows": {
"ingested_minutes": INGESTED_WINDOW_MINUTES,
"observed_fresh_minutes": OBSERVED_FRESH_MINUTES,
"observed_recent_hours": OBSERVED_RECENT_HOURS,
},
"items": fetch_stations(now),
}
except RuntimeError as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
@app.get("/aip/waqi/history")
def waqi_history(
_auth: None = Depends(require_api_key),
date: str = Query(...),
since: str | None = Query(default=None),
):
try:
day = parse_date(date)
if day is None:
raise HTTPException(status_code=400, detail="'date' is required")
from_dt, to_dt = day_bounds_utc(day)
since_dt = parse_dt(since) if since else None
items = fetch_history(from_dt, to_dt, since_dt)
return {
"date": date,
"range": {"from": dt_iso(from_dt), "to": dt_iso(to_dt)},
"items": items,
}
except RuntimeError as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
@app.get("/aip/waqi/history_bounds")
def waqi_history_bounds(
_auth: None = Depends(require_api_key),
):
try:
min_ts, max_ts = fetch_history_bounds()
return {
"tz": UI_TZ,
"min_date": local_date_key(min_ts),
"max_date": local_date_key(max_ts),
}
except RuntimeError as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
@app.get("/aip/waqi/report")
def waqi_report():
return {}
@app.post("/llm/chat")
def llm_chat(payload: dict = Body(...)):
message = str(payload.get("message") or "").strip()
if not message:
raise HTTPException(status_code=400, detail="'message' is required")
mode = payload.get("mode") or "live"
range_payload = payload.get("range") or {}
from_raw = payload.get("from") or range_payload.get("from")
to_raw = payload.get("to") or range_payload.get("to")
try:
message_type = classify_message(message)
to_dt = parse_dt(to_raw) or datetime.now(timezone.utc)
from_dt = parse_dt(from_raw) or (to_dt - timedelta(hours=24))
if from_dt > to_dt:
raise HTTPException(status_code=400, detail="'from' must be <= 'to'")
station_focus = payload.get("station_focus") or payload.get("station_id")
if station_focus is not None:
try:
station_focus = int(station_focus)
except (TypeError, ValueError):
station_focus = None
station_focus = station_focus or extract_station_id(message)
now = datetime.now(timezone.utc)
snapshot = build_snapshot(
now,
from_dt,
to_dt,
mode,
station_focus,
MAX_STATIONS_IN_SNAPSHOT,
)
snapshot_llm = localize_llm_times(snapshot)
snapshot_json = json.dumps(snapshot, ensure_ascii=False)
snapshot_llm_json = json.dumps(snapshot_llm, ensure_ascii=False)
decision = run_router(snapshot_json, message)
anomalies_brief = None
anomalies_mode = None
effective_type = (decision.get("request_type") or message_type or "eco").lower()
if effective_type in ("smalltalk", "meta"):
decision = {
"type": "no_tool",
"reason": "no_tool_for_smalltalk_or_meta",
"request_type": effective_type,
}
answer = build_answer(message, effective_type)
payload = {
"request_id": snapshot["request_id"],
"router": decision,
"tool_result": None,
"answer": answer,
"period": None,
}
return JSONResponse(payload, media_type="application/json; charset=utf-8")
message_type = "eco"
decision = apply_fallbacks(decision, message, snapshot, station_focus, message_type)
decision["request_type"] = "eco"
if decision["type"] == "tool_call_v1" and decision.get("tool") == "get_anomalies":
args = decision.get("args") or {}
if not isinstance(args, dict):
args = {}
if not args.get("range"):
args["range"] = snapshot_range_args(snapshot)
if "limit" not in args and "max_anomalies" in args:
try:
args["limit"] = int(args.get("max_anomalies"))
except (TypeError, ValueError):
pass
decision["args"] = args
decision = sanitize_tool_call(decision, station_focus, snapshot)
tool_result = None
if decision["type"] == "tool_call_v1":
tool_result = execute_tool_call(decision)
tool_result_llm = localize_llm_times(tool_result) if tool_result else None
if tool_result and tool_result.get("ok") is False:
answer = format_tool_error(tool_result)
elif tool_result and tool_result.get("ok") and tool_result.get("tool") == "get_anomalies":
items = (tool_result.get("data") or {}).get("items") or []
if not items:
answer = (
"За выбранный период аномалий не найдено. "
"Можно проверить 24 часа / 3 дня / другую станцию."
)
else:
brief = build_anomalies_brief(tool_result, top_n=MAX_ANOMALIES_BRIEF)
anomalies_brief = brief
if wants_anomaly_full_list(message):
anomalies_mode = "expand"
full_list = format_anomalies_full_list(
tool_result,
max_items=MAX_ANOMALIES_FULL_LIST,
chrono=wants_anomaly_chrono(message),
)
intro = build_answer(
message,
message_type,
anomalies_brief=brief,
anomalies_mode="expand",
)
answer = f"{intro}\n{full_list}".strip()
else:
anomalies_mode = "brief"
answer = build_answer(
message,
message_type,
anomalies_brief=brief,
anomalies_mode="brief",
)
elif tool_result_is_no_data(tool_result):
answer = "нет данных"
else:
answer = build_answer(message, message_type, snapshot_llm_json, tool_result_llm)
answer = strip_iso_period_lines(answer)
period = build_period_payload(snapshot, tool_result)
trace_payload = build_trace_payload(
message,
"eco",
decision,
snapshot,
snapshot_llm,
tool_result,
tool_result_llm,
anomalies_brief=anomalies_brief,
anomalies_mode=anomalies_mode,
period=period,
)
payload = {
"request_id": snapshot["request_id"],
"router": decision,
"tool_result": tool_result,
"answer": answer,
"period": period,
"trace_payload": trace_payload,
}
return JSONResponse(payload, media_type="application/json; charset=utf-8")
except RuntimeError as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc