ФУНКЦИИ - МЕЖПРОЕКТНАЯ КОММУНИКАЦИЯ: backlog-очередь Voice Tasker

This commit is contained in:
DCCONSTRUCTIONS 2026-04-28 20:10:05 +03:00
parent 5e57786d39
commit 0cda486b05
4 changed files with 218 additions and 34 deletions

View File

@ -67,7 +67,9 @@ VOICE_TASK_CONTEXT_LIMIT = 100
VOICE_TASK_RATE_LIMIT_HOURLY_WINDOW_SECONDS = 60 * 60
VOICE_TASK_RATE_LIMIT_DAILY_WINDOW_SECONDS = 24 * 60 * 60
VOICE_TASK_CONCURRENCY_RETRY_AFTER_SECONDS = 15
VOICE_TASK_OPENAI_RETRY_AFTER_SECONDS = 30
VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS = 30 * 60
VOICE_TASK_MAX_QUEUED_SESSIONS_PER_WORKSPACE = 100
VOICE_TASK_MONITOR_WINDOW_SECONDS = 24 * 60 * 60
VOICE_TASK_MONITOR_RECENT_LIMIT = 20
VOICE_TASK_RETENTION_BATCH_SIZE = 500
@ -79,15 +81,20 @@ VOICE_TASK_RATE_LIMIT_ERROR_CODES = {
"voice_task_project_daily_limit_exceeded",
}
VOICE_TASK_CONCURRENCY_ERROR_CODES = {"voice_task_workspace_concurrency_limit_exceeded"}
VOICE_TASK_LIMIT_EXCLUDED_ERROR_CODES = VOICE_TASK_RATE_LIMIT_ERROR_CODES | VOICE_TASK_CONCURRENCY_ERROR_CODES
VOICE_TASK_ACTIVE_SESSION_STATUSES = {
VoiceTaskSession.Status.QUEUED,
VOICE_TASK_QUEUE_ERROR_CODES = {"voice_task_workspace_queue_limit_exceeded", "voice_task_queue_timeout"}
VOICE_TASK_TRANSIENT_OPENAI_ERROR_CODES = {"openai_rate_limited", "openai_unavailable"}
VOICE_TASK_LIMIT_EXCLUDED_ERROR_CODES = (
VOICE_TASK_RATE_LIMIT_ERROR_CODES | VOICE_TASK_CONCURRENCY_ERROR_CODES | VOICE_TASK_QUEUE_ERROR_CODES
)
VOICE_TASK_PROCESSING_SESSION_STATUSES = {
VoiceTaskSession.Status.PROCESSING,
VoiceTaskSession.Status.UPLOADED,
VoiceTaskSession.Status.TRANSCRIBING,
VoiceTaskSession.Status.TRANSCRIBED,
VoiceTaskSession.Status.PARSING,
}
VOICE_TASK_WAITING_SESSION_STATUSES = {VoiceTaskSession.Status.QUEUED}
VOICE_TASK_ACTIVE_SESSION_STATUSES = VOICE_TASK_WAITING_SESSION_STATUSES | VOICE_TASK_PROCESSING_SESSION_STATUSES
VOICE_TASK_PROJECT_MATCH_THRESHOLD = 0.8
VOICE_TASK_ASSIGNEE_MATCH_THRESHOLD = 0.8
VOICE_TASK_STATE_MATCH_THRESHOLD = 0.8
@ -545,7 +552,12 @@ def get_voice_task_duration_ms(started_at, finished_at=None):
def get_voice_task_concurrency_state(workspace, ai_settings):
active_sessions = VoiceTaskSession.objects.filter(
workspace=workspace,
status__in=VOICE_TASK_ACTIVE_SESSION_STATUSES,
status__in=VOICE_TASK_PROCESSING_SESSION_STATUSES,
updated_at__gte=timezone.now() - timedelta(seconds=VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS),
).count()
queued_sessions = VoiceTaskSession.objects.filter(
workspace=workspace,
status__in=VOICE_TASK_WAITING_SESSION_STATUSES,
updated_at__gte=timezone.now() - timedelta(seconds=VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS),
).count()
concurrency_limit = max(int(ai_settings.workspace_concurrency_limit or 0), 0)
@ -554,6 +566,7 @@ def get_voice_task_concurrency_state(workspace, ai_settings):
"scope": "workspace",
"limit": concurrency_limit,
"used": active_sessions,
"queued": queued_sessions,
"exceeded": bool(concurrency_limit and active_sessions >= concurrency_limit),
"retry_after": VOICE_TASK_CONCURRENCY_RETRY_AFTER_SECONDS,
}
@ -574,6 +587,26 @@ def get_voice_task_concurrency_error(workspace, ai_settings):
}
def get_voice_task_queue_limit_error(workspace):
queued_sessions = VoiceTaskSession.objects.filter(
workspace=workspace,
status__in=VOICE_TASK_WAITING_SESSION_STATUSES,
updated_at__gte=timezone.now() - timedelta(seconds=VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS),
).count()
if queued_sessions < VOICE_TASK_MAX_QUEUED_SESSIONS_PER_WORKSPACE:
return None
return {
"code": "voice_task_workspace_queue_limit_exceeded",
"message": "Voice Tasker queue is full for this workspace.",
"scope": "workspace",
"limit": VOICE_TASK_MAX_QUEUED_SESSIONS_PER_WORKSPACE,
"used": queued_sessions,
"retry_after": VOICE_TASK_CONCURRENCY_RETRY_AFTER_SECONDS,
}
def create_voice_task_rate_limit_session(
workspace,
user,
@ -660,8 +693,8 @@ def reserve_voice_task_session(
)
return None, voice_session, rate_limit_error
concurrency_error = get_voice_task_concurrency_error(workspace, ai_settings)
if concurrency_error:
queue_limit_error = get_voice_task_queue_limit_error(workspace)
if queue_limit_error:
voice_session = create_voice_task_failed_preflight_session(
workspace=workspace,
user=user,
@ -669,10 +702,10 @@ def reserve_voice_task_session(
audio_content_type=audio_content_type,
duration_seconds=duration_seconds,
client_context=client_context,
error=concurrency_error,
error=queue_limit_error,
project=quota_project,
)
return None, voice_session, concurrency_error
return None, voice_session, queue_limit_error
voice_session = VoiceTaskSession(
workspace=workspace,
@ -2669,16 +2702,129 @@ def voice_task_requires_confirmation(parsed, warnings):
)
def process_voice_task_session_pipeline(voice_session_id):
voice_session = VoiceTaskSession.objects.select_related("workspace", "user", "project").filter(
id=voice_session_id
).first()
def claim_voice_task_processing_slot(voice_session_id):
with transaction.atomic():
voice_session = (
VoiceTaskSession.objects.select_for_update(of=("self",))
.select_related("workspace", "user", "project")
.filter(id=voice_session_id)
.first()
)
if not voice_session:
return
return None, None
if voice_session.status == VoiceTaskSession.Status.PARSED:
return
if voice_session.status in {VoiceTaskSession.Status.PARSED, VoiceTaskSession.Status.FAILED}:
return voice_session, None
if voice_session.status != VoiceTaskSession.Status.QUEUED:
return voice_session, None
if not voice_session.audio_file:
return voice_session, None
with connection.cursor() as cursor:
cursor.execute("SELECT pg_advisory_xact_lock(%s)", [get_voice_task_workspace_lock_key(voice_session.workspace_id)])
ai_settings = WorkspaceAISettings.objects.select_for_update().filter(workspace=voice_session.workspace).first()
if not ai_settings:
now = timezone.now()
voice_session.status = VoiceTaskSession.Status.FAILED
voice_session.failed_at = now
voice_session.error_code = "not_configured"
voice_session.error_message = "Voice Tasker is not configured for this workspace."
voice_session.save(update_fields=["status", "failed_at", "error_code", "error_message", "updated_at"])
return voice_session, None
concurrency_error = get_voice_task_concurrency_error(voice_session.workspace, ai_settings)
if concurrency_error:
voice_session.save(update_fields=["updated_at"])
return voice_session, {
"retry": True,
"code": concurrency_error["code"],
"message": concurrency_error["message"],
"countdown": concurrency_error["retry_after"],
}
voice_session.status = VoiceTaskSession.Status.PROCESSING
voice_session.processing_started_at = timezone.now()
voice_session.error_code = ""
voice_session.error_message = ""
voice_session.save(
update_fields=[
"status",
"processing_started_at",
"error_code",
"error_message",
"updated_at",
]
)
return voice_session, None
def requeue_voice_task_session(voice_session, retry_error, countdown=VOICE_TASK_OPENAI_RETRY_AFTER_SECONDS):
voice_session.status = VoiceTaskSession.Status.QUEUED
voice_session.error_code = retry_error.code
voice_session.error_message = retry_error.message
voice_session.save(update_fields=["status", "error_code", "error_message", "updated_at"])
return {
"retry": True,
"code": retry_error.code,
"message": retry_error.message,
"countdown": countdown,
}
def fail_voice_task_session(voice_session_id, code, message):
voice_session = VoiceTaskSession.objects.filter(id=voice_session_id).first()
if not voice_session or voice_session.status in {VoiceTaskSession.Status.PARSED, VoiceTaskSession.Status.FAILED}:
return None
now = timezone.now()
voice_session.status = VoiceTaskSession.Status.FAILED
voice_session.failed_at = now
voice_session.processing_duration_ms = get_voice_task_duration_ms(voice_session.processing_started_at, now)
voice_session.error_code = code
voice_session.error_message = message
voice_session.save(
update_fields=[
"status",
"failed_at",
"processing_duration_ms",
"error_code",
"error_message",
"updated_at",
]
)
try:
clear_voice_task_audio_file(voice_session)
except Exception as exc:
log_exception(exc)
return voice_session
def process_voice_task_session_pipeline(voice_session_id):
voice_session, queue_retry = claim_voice_task_processing_slot(voice_session_id)
if not voice_session:
return {"ok": False, "code": "voice_session_not_found"}
if queue_retry:
return queue_retry
if voice_session.status in {VoiceTaskSession.Status.PARSED, VoiceTaskSession.Status.FAILED}:
return {"ok": True, "status": voice_session.status}
if voice_session.status == VoiceTaskSession.Status.QUEUED and not voice_session.audio_file:
fail_voice_task_session(
voice_session.id,
"missing_audio",
"Voice Tasker audio file is not available for processing.",
)
return {"ok": False, "code": "missing_audio"}
if voice_session.status != VoiceTaskSession.Status.PROCESSING:
return {"ok": True, "status": voice_session.status}
should_clear_audio_file = False
try:
if not voice_session.audio_file:
raise VoiceTaskerPipelineError(
@ -2692,10 +2838,6 @@ def process_voice_task_session_pipeline(voice_session_id):
client_context = voice_session.client_context or {}
ai_settings, api_key = get_workspace_ai_runtime(workspace)
voice_session.status = VoiceTaskSession.Status.PROCESSING
voice_session.processing_started_at = timezone.now()
voice_session.save(update_fields=["status", "processing_started_at", "updated_at"])
voice_session.status = VoiceTaskSession.Status.TRANSCRIBING
voice_session.save(update_fields=["status", "updated_at"])
@ -2794,8 +2936,16 @@ def process_voice_task_session_pipeline(voice_session_id):
"updated_at",
]
)
should_clear_audio_file = True
except VoiceTaskerPipelineError as exc:
pipeline_error = exc
if pipeline_error.code in VOICE_TASK_TRANSIENT_OPENAI_ERROR_CODES:
return requeue_voice_task_session(
voice_session,
pipeline_error,
countdown=VOICE_TASK_OPENAI_RETRY_AFTER_SECONDS,
)
voice_session.status = VoiceTaskSession.Status.FAILED
voice_session.failed_at = timezone.now()
voice_session.processing_duration_ms = get_voice_task_duration_ms(
@ -2814,8 +2964,16 @@ def process_voice_task_session_pipeline(voice_session_id):
"updated_at",
]
)
should_clear_audio_file = True
except Exception as exc:
pipeline_error = get_openai_pipeline_error(exc)
if pipeline_error.code in VOICE_TASK_TRANSIENT_OPENAI_ERROR_CODES:
return requeue_voice_task_session(
voice_session,
pipeline_error,
countdown=VOICE_TASK_OPENAI_RETRY_AFTER_SECONDS,
)
voice_session.status = VoiceTaskSession.Status.FAILED
voice_session.failed_at = timezone.now()
voice_session.processing_duration_ms = get_voice_task_duration_ms(
@ -2834,7 +2992,9 @@ def process_voice_task_session_pipeline(voice_session_id):
"updated_at",
]
)
should_clear_audio_file = True
finally:
if should_clear_audio_file:
try:
clear_voice_task_audio_file(voice_session)
except Exception as exc:

View File

@ -5,11 +5,24 @@
from celery import shared_task
@shared_task
def process_voice_task_session(voice_session_id):
from plane.app.views.voice_tasker import process_voice_task_session_pipeline
@shared_task(bind=True, max_retries=60)
def process_voice_task_session(self, voice_session_id):
from plane.app.views.voice_tasker import fail_voice_task_session, process_voice_task_session_pipeline
process_voice_task_session_pipeline(voice_session_id)
result = process_voice_task_session_pipeline(voice_session_id) or {}
if not result.get("retry"):
return result
if self.request.retries >= self.max_retries:
fail_voice_task_session(
voice_session_id,
"voice_task_queue_timeout",
"Voice Tasker session stayed in retry queue for too long.",
)
return {"ok": False, "code": "voice_task_queue_timeout"}
countdown = int(result.get("countdown") or 15)
raise self.retry(countdown=countdown, exc=Exception(result.get("code") or "voice_task_retry"))
@shared_task

View File

@ -630,22 +630,28 @@ type TLimitUsageOverviewProps = {
function LimitUsageOverview({ formState, monitor }: TLimitUsageOverviewProps) {
const totalSessions = monitor?.summary.total ?? 0;
const activeSessions = monitor?.summary.active ?? 0;
const processingSessions = monitor?.concurrency.used ?? 0;
const queuedSessions = monitor?.concurrency.queued ?? monitor?.summary.status_counts?.queued ?? 0;
const workspaceDailyRatio = getLimitRatio(totalSessions, formState.workspace_daily_limit);
const concurrencyRatio = getLimitRatio(activeSessions, formState.workspace_concurrency_limit);
const concurrencyRatio = getLimitRatio(processingSessions, formState.workspace_concurrency_limit);
return (
<div className="grid gap-3 border-t border-white/5 px-5 py-5 md:grid-cols-4">
<div className="grid gap-3 border-t border-white/5 px-5 py-5 md:grid-cols-5">
<LimitUsageCard
label="Workspace / сутки"
value={`${formatMonitorNumber(totalSessions)} / ${formatMonitorNumber(formState.workspace_daily_limit)}`}
ratio={workspaceDailyRatio}
/>
<LimitUsageCard
label="Очередь сейчас"
value={`${formatMonitorNumber(activeSessions)} / ${formatMonitorNumber(formState.workspace_concurrency_limit)}`}
label="OpenAI слоты"
value={`${formatMonitorNumber(processingSessions)} / ${formatMonitorNumber(formState.workspace_concurrency_limit)}`}
ratio={concurrencyRatio}
/>
<LimitUsageCard
label="В очереди"
value={formatMonitorNumber(queuedSessions)}
ratio={0}
/>
<LimitUsageCard
label="Project / сутки"
value={formatMonitorNumber(formState.project_daily_limit)}
@ -899,9 +905,13 @@ function VoiceTaskMonitorSection({
<div className="flex flex-col gap-5 border-t border-white/5 px-5 py-5">
<div className="grid gap-3 md:grid-cols-4">
<MonitorMetricCard
label="Активные"
value={`${monitor.summary.active}/${monitor.concurrency.limit}`}
meta={staleCount ? `зависшие: ${staleCount}` : "очередь в норме"}
label="OpenAI слоты"
value={`${monitor.concurrency.used}/${monitor.concurrency.limit}`}
meta={
staleCount
? `зависшие: ${staleCount}`
: `в очереди: ${formatMonitorNumber(monitor.concurrency.queued ?? monitor.summary.status_counts?.queued ?? 0)}`
}
tone={staleCount ? "danger" : "accent"}
/>
<MonitorMetricCard

View File

@ -279,6 +279,7 @@ export type TVoiceTaskMonitor = {
scope: "workspace";
limit: number;
used: number;
queued?: number;
exceeded: boolean;
retry_after: number;
};