import fs from "fs"; import path from "path"; import { nanoid } from "nanoid"; import { Router } from "express"; import { ASSISTANT_SESSIONS_DIR, EVAL_CASES_DIR, EVAL_DATASETS_DIR } from "../config"; import type { AppServices } from "../serverContext"; import { ApiError, ok } from "../utils/http"; import type { EvalRunMode, NormalizeRequestPayload } from "../types/normalizer"; import type { EvalTarget } from "../types/assistantEval"; import { repairAddressMojibakeText } from "../services/addressTextRepair"; type EvalAsyncStatus = "queued" | "running" | "completed" | "failed" | "canceled"; interface EvalAsyncCaseInfo { case_id: string; turns_total: number; status: EvalAsyncStatus; messages: Array<{ message_id: string | null; role: string; text: string; created_at: string | null; trace_id: string | null; reply_type: string | null; message_index: number; case_id: string; case_message_index: number; }>; } interface EvalAsyncJob { job_id: string; status: EvalAsyncStatus; created_at: string; updated_at: string; eval_target: EvalTarget; run_id: string; case_set_file: string | null; analysis_date: string | null; total_cases: number; completed_cases: number; cases: EvalAsyncCaseInfo[]; error: string | null; report: Record | null; abort_controller?: AbortController | null; } const ASYNC_JOBS = new Map(); const MAX_ASYNC_JOBS = 80; function toRecord(value: unknown): Record | null { if (!value || typeof value !== "object" || Array.isArray(value)) { return null; } return value as Record; } function toStringSafe(value: unknown): string | null { if (typeof value !== "string") { return null; } const trimmed = value.trim(); return trimmed.length > 0 ? trimmed : null; } function toArray(value: unknown): unknown[] { return Array.isArray(value) ? value : []; } function normalizeQuestionChunk(value: string): string { return repairAddressMojibakeText(String(value ?? "")) .replace(/\r/g, " ") .replace(/\t/g, " ") .replace(/\s+/g, " ") .trim(); } const RUNTIME_QUESTION_PLACEHOLDER_PATTERN = /^(?:questions?|вопросы?|список\s+вопросов)$/iu; const RUNTIME_QUESTION_TAIL_PATTERNS: RegExp[] = [ /^(?:без\s+воды|по\s+факту|и\s+коротко|коротко|прям(?:\s+)?сейчас|за\s+весь\s+период|по\s+делу)\??$/iu ]; function stripQuestionSuffix(value: string): string { return normalizeQuestionChunk(value).replace(/[?!.:,;]+$/u, "").trim(); } function isRuntimeQuestionPlaceholder(value: string): boolean { const core = stripQuestionSuffix(value).toLowerCase(); return core.length > 0 && RUNTIME_QUESTION_PLACEHOLDER_PATTERN.test(core); } function isLikelyRuntimeQuestionTail(value: string): boolean { const core = stripQuestionSuffix(value).toLowerCase(); if (!core) { return false; } if (isRuntimeQuestionPlaceholder(core)) { return true; } return RUNTIME_QUESTION_TAIL_PATTERNS.some((pattern) => pattern.test(core)); } function mergeRuntimeQuestionTail(baseQuestion: string, tail: string): string { const base = stripQuestionSuffix(baseQuestion); const suffix = stripQuestionSuffix(tail); if (!base) { return suffix ? `${suffix}?` : ""; } if (!suffix) { return `${base}?`; } return `${base} ${suffix}?` .replace(/\s+/g, " ") .trim(); } function normalizeRuntimeQuestionList(items: string[]): string[] { const normalized: string[] = []; for (const item of items) { const chunk = normalizeQuestionChunk(item); if (!chunk) { continue; } if (isRuntimeQuestionPlaceholder(chunk)) { continue; } if (isLikelyRuntimeQuestionTail(chunk) && normalized.length > 0) { const merged = mergeRuntimeQuestionTail(normalized[normalized.length - 1], chunk); if (merged) { normalized[normalized.length - 1] = merged; } continue; } normalized.push(chunk); } return normalized.filter((item) => item.length > 0); } function splitQuestionCandidate(raw: string): string[] { const normalized = repairAddressMojibakeText(String(raw ?? "")).replace(/\r/g, "\n").trim(); if (!normalized) { return []; } const byLines = normalized .split(/\n+/g) .map((line) => line.replace(/^\s*(?:[-*•]|\d{1,3}[).:]?)\s*/, "").trim()) .filter((line) => line.length > 0); const source = byLines.length > 1 ? byLines : [normalized]; const chunks: string[] = []; for (const line of source) { const normalizedLine = normalizeQuestionChunk(line); if (!normalizedLine || isRuntimeQuestionPlaceholder(normalizedLine)) { continue; } const questionLike = Array.from(line.matchAll(/[^?]+(?:\?|$)/g)) .map((match) => normalizeQuestionChunk(match[0])) .filter((item) => item.length > 0); if (questionLike.length > 1) { const canSafelySplit = questionLike.every( (item) => !isRuntimeQuestionPlaceholder(item) && !isLikelyRuntimeQuestionTail(item) && normalizeQuestionChunk(item).length >= 18 ); if (canSafelySplit) { for (const item of questionLike) { chunks.push(item.endsWith("?") ? item : `${item}?`); } } else { chunks.push(normalizedLine); } continue; } chunks.push(normalizedLine); } return normalizeRuntimeQuestionList(chunks); } function normalizeRuntimeQuestions(value: unknown, options?: { dedupe?: boolean; splitCandidates?: boolean }): string[] { const raw = toArray(value) .map((item) => (typeof item === "string" ? item.trim() : "")) .filter((item) => item.length > 0); if (raw.length === 0) { return []; } const splitCandidates = options?.splitCandidates ?? true; const expanded = splitCandidates ? normalizeRuntimeQuestionList(raw.flatMap((item) => splitQuestionCandidate(item))) : raw .map((item) => normalizeQuestionChunk(item)) .filter((item): item is string => Boolean(item)); const dedupe = options?.dedupe ?? true; if (!dedupe) { return expanded; } const deduped: string[] = []; const seen = new Set(); for (const item of expanded) { const normalized = normalizeQuestionChunk(item); if (!normalized) continue; if (seen.has(normalized)) continue; seen.add(normalized); deduped.push(normalized); } return deduped; } export const __evalRouteTestUtils = { splitQuestionCandidate, normalizeRuntimeQuestions }; export const __evalRouteAsyncTestUtils = { readReportedCaseIds, syncJobWithSessions }; function normalizeCaseIds(value: unknown): string[] | undefined { if (!Array.isArray(value)) { return undefined; } const normalized = value .map((item) => (typeof item === "string" ? item.trim() : "")) .filter((item) => item.length > 0); return normalized.length > 0 ? normalized : undefined; } function normalizeAnalysisDate(value: unknown): string | undefined { if (typeof value !== "string") { return undefined; } const trimmed = value.trim(); const match = trimmed.match(/^(\d{4})-(\d{2})-(\d{2})$/); if (!match) { return undefined; } const year = Number(match[1]); const month = Number(match[2]); const day = Number(match[3]); if (!Number.isFinite(year) || !Number.isFinite(month) || !Number.isFinite(day)) { return undefined; } const candidate = new Date(Date.UTC(year, month - 1, day)); if ( candidate.getUTCFullYear() !== year || candidate.getUTCMonth() + 1 !== month || candidate.getUTCDate() !== day ) { return undefined; } return `${match[1]}-${match[2]}-${match[3]}`; } function buildEvalPayloadFromBody(body: Record): { normalizeConfig: Omit; caseIds?: string[]; useMock: boolean; mode: EvalRunMode; caseSetFile?: string; rawQuestions?: string; evalTarget: EvalTarget; compareWithReportFile?: string; analysisDate?: string; } { const analysisDate = normalizeAnalysisDate(body.analysis_date) ?? normalizeAnalysisDate(body.analysisDate); return { normalizeConfig: (body.normalizeConfig ?? {}) as Omit, caseIds: normalizeCaseIds(body.caseIds), useMock: Boolean(body.useMock), mode: (body.mode as EvalRunMode | undefined) ?? "standard", caseSetFile: typeof body.caseSetFile === "string" ? body.caseSetFile : undefined, rawQuestions: typeof body.rawQuestions === "string" ? body.rawQuestions : undefined, evalTarget: (body.eval_target as EvalTarget | undefined) ?? "normalizer", compareWithReportFile: typeof body.compare_with_report_file === "string" ? body.compare_with_report_file : typeof body.comparisonBaselineReportFile === "string" ? body.comparisonBaselineReportFile : undefined, analysisDate }; } function resolveReadablePath(inputPath: string): string { if (path.isAbsolute(inputPath)) { return inputPath; } const candidates = [ path.resolve(EVAL_CASES_DIR, inputPath), path.resolve(EVAL_DATASETS_DIR, inputPath), path.resolve(inputPath) ]; for (const candidate of candidates) { if (fs.existsSync(candidate)) { return candidate; } } return candidates[0]; } function readAssistantSuiteCaseSeeds(inputPath: string): Array<{ case_id: string; turns_total: number }> { const filePath = resolveReadablePath(inputPath); const raw = fs.readFileSync(filePath, "utf-8").replace(/^\uFEFF/, ""); const parsed = JSON.parse(raw) as unknown; const record = toRecord(parsed); const cases = toArray(record?.cases); return cases .map((item) => toRecord(item)) .filter((item): item is Record => item !== null) .map((item) => { const caseId = toStringSafe(item.case_id); const turns = toArray(item.turns); if (!caseId || turns.length === 0) { return null; } return { case_id: caseId, turns_total: turns.length }; }) .filter((item): item is { case_id: string; turns_total: number } => item !== null); } function writeRuntimeAssistantSuiteFromQuestions(jobId: string, questions: string[]): string { if (!fs.existsSync(EVAL_CASES_DIR)) { fs.mkdirSync(EVAL_CASES_DIR, { recursive: true }); } const cases = questions.map((question, index) => { const caseId = `AUTO-${String(index + 1).padStart(3, "0")}`; return { case_id: caseId, scenario_tag: "autogen_runtime", question_type: "direct", broadness_level: "medium", turns: [{ user_message: question }] }; }); const payload = { suite_id: `assistant_autogen_runtime_${jobId}`, suite_version: "0.1.0", schema_version: "assistant_autogen_runtime_v0_1", scenario_count: cases.length, case_ids: cases.map((item) => item.case_id), cases }; const fileName = `assistant_autogen_runtime_${jobId}.json`; fs.writeFileSync(path.resolve(EVAL_CASES_DIR, fileName), JSON.stringify(payload, null, 2), "utf-8"); return fileName; } function writeRuntimeAssistantScenarioSuiteFromQuestions(jobId: string, questions: string[], title?: string): string { if (!fs.existsSync(EVAL_CASES_DIR)) { fs.mkdirSync(EVAL_CASES_DIR, { recursive: true }); } const turns = questions.map((question) => ({ user_message: question })); const payload = { suite_id: `assistant_saved_session_runtime_${jobId}`, suite_version: "0.1.0", schema_version: "assistant_saved_session_runtime_v0_1", title: typeof title === "string" ? title.trim() || null : null, scenario_count: turns.length > 0 ? 1 : 0, case_ids: turns.length > 0 ? ["SAVED-001"] : [], cases: turns.length > 0 ? [ { case_id: "SAVED-001", scenario_tag: "saved_user_sessions_runtime", title: typeof title === "string" ? title.trim() || null : null, question_type: turns.length > 1 ? "followup" : "direct", broadness_level: "medium", turns } ] : [] }; const fileName = `assistant_saved_session_runtime_${jobId}.json`; fs.writeFileSync(path.resolve(EVAL_CASES_DIR, fileName), JSON.stringify(payload, null, 2), "utf-8"); return fileName; } function readSessionConversation(runId: string, caseId: string): EvalAsyncCaseInfo["messages"] { const sessionId = `${runId}-${caseId}`; const filePath = path.resolve(ASSISTANT_SESSIONS_DIR, `${sessionId}.json`); if (!fs.existsSync(filePath)) { return []; } try { const parsed = JSON.parse(fs.readFileSync(filePath, "utf-8")) as unknown; const record = toRecord(parsed); const conversation = toArray(record?.conversation) .map((item) => toRecord(item)) .filter((item): item is Record => item !== null); return conversation.map((item, index) => ({ message_id: toStringSafe(item.message_id), role: toStringSafe(item.role) ?? "unknown", text: toStringSafe(item.text) ?? "", created_at: toStringSafe(item.created_at), trace_id: toStringSafe(item.trace_id), reply_type: toStringSafe(item.reply_type), message_index: index, case_id: caseId, case_message_index: index })); } catch { return []; } } function readReportedCaseIds(job: EvalAsyncJob): Set { const output = new Set(); const reportRecord = toRecord(job.report); const reportResults = toArray(reportRecord?.results) .map((item) => toRecord(item)) .filter((item): item is Record => item !== null); for (const result of reportResults) { const caseId = toStringSafe(result.case_id); if (!caseId) continue; output.add(caseId); } return output; } function isTerminalCaseStatus(status: EvalAsyncStatus): boolean { return status === "completed" || status === "failed" || status === "canceled"; } function syncJobWithSessions(job: EvalAsyncJob): void { if (!job.run_id || !job.eval_target.startsWith("assistant_")) { return; } const reportCaseIds = readReportedCaseIds(job); let completed = 0; let hasRunning = false; for (const item of job.cases) { const messages = readSessionConversation(job.run_id, item.case_id); item.messages = messages; const assistantMessages = messages.filter((entry) => entry.role === "assistant").length; const userMessages = messages.filter((entry) => entry.role === "user").length; const reportMarkedDone = reportCaseIds.has(item.case_id); if ((assistantMessages >= item.turns_total && item.turns_total > 0) || reportMarkedDone) { item.status = "completed"; completed += 1; continue; } if (isTerminalCaseStatus(item.status) && isTerminalCaseStatus(job.status)) { completed += 1; continue; } if (userMessages > 0 || messages.length > 0) { item.status = "running"; hasRunning = true; continue; } item.status = "queued"; } job.completed_cases = completed; if (job.status === "running" && !hasRunning && completed === job.total_cases && job.total_cases > 0) { job.status = "completed"; } } function trimAsyncJobsStore(): void { if (ASYNC_JOBS.size <= MAX_ASYNC_JOBS) return; const sorted = Array.from(ASYNC_JOBS.values()).sort((a, b) => Date.parse(a.updated_at) - Date.parse(b.updated_at)); for (const item of sorted) { if (ASYNC_JOBS.size <= MAX_ASYNC_JOBS) break; ASYNC_JOBS.delete(item.job_id); } } function snapshotJob(job: EvalAsyncJob): Record { return { job_id: job.job_id, status: job.status, created_at: job.created_at, updated_at: job.updated_at, eval_target: job.eval_target, run_id: job.run_id, case_set_file: job.case_set_file, analysis_date: job.analysis_date, total_cases: job.total_cases, completed_cases: job.completed_cases, error: job.error, cases: job.cases, report_summary: job.report ? { run_id: toStringSafe(job.report.run_id), run_timestamp: toStringSafe(job.report.run_timestamp) ?? toStringSafe(job.report.timestamp), score_index: typeof job.report.score_index === "number" ? Number(job.report.score_index) : toRecord(job.report.metrics) && typeof toRecord(job.report.metrics)?.score_index === "number" ? Number(toRecord(job.report.metrics)?.score_index) : null, cases_total: typeof job.report.cases_total === "number" ? Number(job.report.cases_total) : null, analysis_date: toStringSafe(job.report.analysis_date) ?? job.analysis_date } : null }; } export function buildEvalRouter(services: AppServices): Router { const router = Router(); router.post("/api/eval/run", async (req, res, next) => { try { const body = (req.body ?? {}) as Record; const payload = buildEvalPayloadFromBody(body); const report = await services.evalService.run(payload); ok(res, { ok: true, report }); } catch (error) { next(error); } }); router.post("/api/eval/run-async/start", async (req, res, next) => { try { const body = (req.body ?? {}) as Record; const payload = buildEvalPayloadFromBody(body); if (payload.evalTarget !== "assistant_stage1") { throw new ApiError("UNSUPPORTED_ASYNC_EVAL_TARGET", "Async eval currently supports assistant_stage1 only.", 400); } const questions = normalizeRuntimeQuestions(body.questions); const scenarioQuestions = normalizeRuntimeQuestions(body.scenarioQuestions, { dedupe: false, splitCandidates: false }); const scenarioTitleRaw = toStringSafe(body.scenarioTitle); const scenarioTitle = scenarioTitleRaw ? repairAddressMojibakeText(scenarioTitleRaw) : null; const jobId = `job-${nanoid(10)}`; const runId = `assistant-stage1-${nanoid(10)}`; const runtimeCaseSetFile = scenarioQuestions.length > 0 ? writeRuntimeAssistantScenarioSuiteFromQuestions(jobId, scenarioQuestions, scenarioTitle ?? undefined) : questions.length > 0 ? writeRuntimeAssistantSuiteFromQuestions(jobId, questions) : payload.caseSetFile ? payload.caseSetFile : undefined; if (!runtimeCaseSetFile) { throw new ApiError( "ASYNC_CASESET_REQUIRED", "Async assistant_stage1 run requires caseSetFile, scenarioQuestions[] or explicit questions[] payload.", 400 ); } const caseSeeds = readAssistantSuiteCaseSeeds(runtimeCaseSetFile); if (caseSeeds.length === 0) { throw new ApiError("ASYNC_CASESET_EMPTY", "No runnable cases found in selected case-set.", 400); } const nowIso = new Date().toISOString(); const abortController = new AbortController(); const job: EvalAsyncJob = { job_id: jobId, status: "queued", created_at: nowIso, updated_at: nowIso, eval_target: payload.evalTarget, run_id: runId, case_set_file: runtimeCaseSetFile, analysis_date: payload.analysisDate ?? null, total_cases: caseSeeds.length, completed_cases: 0, cases: caseSeeds.map((item) => ({ case_id: item.case_id, turns_total: item.turns_total, status: "queued", messages: [] })), error: null, report: null, abort_controller: abortController }; ASYNC_JOBS.set(job.job_id, job); trimAsyncJobsStore(); setImmediate(() => { void (async () => { const target = ASYNC_JOBS.get(job.job_id); if (!target) return; if (target.status === "canceled") { return; } target.status = "running"; target.updated_at = new Date().toISOString(); try { const report = await services.evalService.run({ ...payload, caseSetFile: runtimeCaseSetFile, runId, abortSignal: abortController.signal }); const latestAfterRun = ASYNC_JOBS.get(job.job_id); if (!latestAfterRun) { return; } if (latestAfterRun.status === "canceled") { latestAfterRun.updated_at = new Date().toISOString(); return; } latestAfterRun.report = report; syncJobWithSessions(latestAfterRun); latestAfterRun.completed_cases = latestAfterRun.total_cases; latestAfterRun.status = "completed"; latestAfterRun.updated_at = new Date().toISOString(); latestAfterRun.abort_controller = null; } catch (error) { const latestAfterError = ASYNC_JOBS.get(job.job_id); if (!latestAfterError) { return; } if (latestAfterError.status === "canceled") { latestAfterError.updated_at = new Date().toISOString(); latestAfterError.abort_controller = null; return; } syncJobWithSessions(latestAfterError); latestAfterError.status = "failed"; latestAfterError.error = error instanceof Error ? error.message : String(error); latestAfterError.updated_at = new Date().toISOString(); latestAfterError.abort_controller = null; } })(); }); ok(res, { ok: true, job: snapshotJob(job) }); } catch (error) { next(error); } }); router.get("/api/eval/run-async/:job_id", (req, res, next) => { try { const jobId = String(req.params.job_id ?? "").trim(); if (!jobId) { throw new ApiError("INVALID_ASYNC_JOB_ID", "job_id is required.", 400); } const job = ASYNC_JOBS.get(jobId); if (!job) { throw new ApiError("ASYNC_JOB_NOT_FOUND", `Async eval job not found: ${jobId}`, 404); } syncJobWithSessions(job); job.updated_at = new Date().toISOString(); ok(res, { ok: true, job: snapshotJob(job) }); } catch (error) { next(error); } }); router.post("/api/eval/run-async/:job_id/cancel", (req, res, next) => { try { const jobId = String(req.params.job_id ?? "").trim(); if (!jobId) { throw new ApiError("INVALID_ASYNC_JOB_ID", "job_id is required.", 400); } const job = ASYNC_JOBS.get(jobId); if (!job) { throw new ApiError("ASYNC_JOB_NOT_FOUND", `Async eval job not found: ${jobId}`, 404); } if (!isTerminalCaseStatus(job.status)) { job.status = "canceled"; job.error = "Остановлено оператором."; job.updated_at = new Date().toISOString(); job.abort_controller?.abort(); job.abort_controller = null; job.cases = job.cases.map((item) => item.status === "completed" ? item : { ...item, status: "canceled" } ); syncJobWithSessions(job); } ok(res, { ok: true, job: snapshotJob(job) }); } catch (error) { next(error); } }); return router; }