diff --git a/plane-src/apps/api/plane/app/views/voice_tasker.py b/plane-src/apps/api/plane/app/views/voice_tasker.py index 39efa90..78221ff 100644 --- a/plane-src/apps/api/plane/app/views/voice_tasker.py +++ b/plane-src/apps/api/plane/app/views/voice_tasker.py @@ -67,7 +67,9 @@ VOICE_TASK_CONTEXT_LIMIT = 100 VOICE_TASK_RATE_LIMIT_HOURLY_WINDOW_SECONDS = 60 * 60 VOICE_TASK_RATE_LIMIT_DAILY_WINDOW_SECONDS = 24 * 60 * 60 VOICE_TASK_CONCURRENCY_RETRY_AFTER_SECONDS = 15 +VOICE_TASK_OPENAI_RETRY_AFTER_SECONDS = 30 VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS = 30 * 60 +VOICE_TASK_MAX_QUEUED_SESSIONS_PER_WORKSPACE = 100 VOICE_TASK_MONITOR_WINDOW_SECONDS = 24 * 60 * 60 VOICE_TASK_MONITOR_RECENT_LIMIT = 20 VOICE_TASK_RETENTION_BATCH_SIZE = 500 @@ -79,15 +81,20 @@ VOICE_TASK_RATE_LIMIT_ERROR_CODES = { "voice_task_project_daily_limit_exceeded", } VOICE_TASK_CONCURRENCY_ERROR_CODES = {"voice_task_workspace_concurrency_limit_exceeded"} -VOICE_TASK_LIMIT_EXCLUDED_ERROR_CODES = VOICE_TASK_RATE_LIMIT_ERROR_CODES | VOICE_TASK_CONCURRENCY_ERROR_CODES -VOICE_TASK_ACTIVE_SESSION_STATUSES = { - VoiceTaskSession.Status.QUEUED, +VOICE_TASK_QUEUE_ERROR_CODES = {"voice_task_workspace_queue_limit_exceeded", "voice_task_queue_timeout"} +VOICE_TASK_TRANSIENT_OPENAI_ERROR_CODES = {"openai_rate_limited", "openai_unavailable"} +VOICE_TASK_LIMIT_EXCLUDED_ERROR_CODES = ( + VOICE_TASK_RATE_LIMIT_ERROR_CODES | VOICE_TASK_CONCURRENCY_ERROR_CODES | VOICE_TASK_QUEUE_ERROR_CODES +) +VOICE_TASK_PROCESSING_SESSION_STATUSES = { VoiceTaskSession.Status.PROCESSING, VoiceTaskSession.Status.UPLOADED, VoiceTaskSession.Status.TRANSCRIBING, VoiceTaskSession.Status.TRANSCRIBED, VoiceTaskSession.Status.PARSING, } +VOICE_TASK_WAITING_SESSION_STATUSES = {VoiceTaskSession.Status.QUEUED} +VOICE_TASK_ACTIVE_SESSION_STATUSES = VOICE_TASK_WAITING_SESSION_STATUSES | VOICE_TASK_PROCESSING_SESSION_STATUSES VOICE_TASK_PROJECT_MATCH_THRESHOLD = 0.8 VOICE_TASK_ASSIGNEE_MATCH_THRESHOLD = 0.8 VOICE_TASK_STATE_MATCH_THRESHOLD = 0.8 @@ -545,7 +552,12 @@ def get_voice_task_duration_ms(started_at, finished_at=None): def get_voice_task_concurrency_state(workspace, ai_settings): active_sessions = VoiceTaskSession.objects.filter( workspace=workspace, - status__in=VOICE_TASK_ACTIVE_SESSION_STATUSES, + status__in=VOICE_TASK_PROCESSING_SESSION_STATUSES, + updated_at__gte=timezone.now() - timedelta(seconds=VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS), + ).count() + queued_sessions = VoiceTaskSession.objects.filter( + workspace=workspace, + status__in=VOICE_TASK_WAITING_SESSION_STATUSES, updated_at__gte=timezone.now() - timedelta(seconds=VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS), ).count() concurrency_limit = max(int(ai_settings.workspace_concurrency_limit or 0), 0) @@ -554,6 +566,7 @@ def get_voice_task_concurrency_state(workspace, ai_settings): "scope": "workspace", "limit": concurrency_limit, "used": active_sessions, + "queued": queued_sessions, "exceeded": bool(concurrency_limit and active_sessions >= concurrency_limit), "retry_after": VOICE_TASK_CONCURRENCY_RETRY_AFTER_SECONDS, } @@ -574,6 +587,26 @@ def get_voice_task_concurrency_error(workspace, ai_settings): } +def get_voice_task_queue_limit_error(workspace): + queued_sessions = VoiceTaskSession.objects.filter( + workspace=workspace, + status__in=VOICE_TASK_WAITING_SESSION_STATUSES, + updated_at__gte=timezone.now() - timedelta(seconds=VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS), + ).count() + + if queued_sessions < VOICE_TASK_MAX_QUEUED_SESSIONS_PER_WORKSPACE: + return None + + return { + "code": "voice_task_workspace_queue_limit_exceeded", + "message": "Voice Tasker queue is full for this workspace.", + "scope": "workspace", + "limit": VOICE_TASK_MAX_QUEUED_SESSIONS_PER_WORKSPACE, + "used": queued_sessions, + "retry_after": VOICE_TASK_CONCURRENCY_RETRY_AFTER_SECONDS, + } + + def create_voice_task_rate_limit_session( workspace, user, @@ -660,8 +693,8 @@ def reserve_voice_task_session( ) return None, voice_session, rate_limit_error - concurrency_error = get_voice_task_concurrency_error(workspace, ai_settings) - if concurrency_error: + queue_limit_error = get_voice_task_queue_limit_error(workspace) + if queue_limit_error: voice_session = create_voice_task_failed_preflight_session( workspace=workspace, user=user, @@ -669,10 +702,10 @@ def reserve_voice_task_session( audio_content_type=audio_content_type, duration_seconds=duration_seconds, client_context=client_context, - error=concurrency_error, + error=queue_limit_error, project=quota_project, ) - return None, voice_session, concurrency_error + return None, voice_session, queue_limit_error voice_session = VoiceTaskSession( workspace=workspace, @@ -2669,16 +2702,129 @@ def voice_task_requires_confirmation(parsed, warnings): ) +def claim_voice_task_processing_slot(voice_session_id): + with transaction.atomic(): + voice_session = ( + VoiceTaskSession.objects.select_for_update(of=("self",)) + .select_related("workspace", "user", "project") + .filter(id=voice_session_id) + .first() + ) + if not voice_session: + return None, None + + if voice_session.status in {VoiceTaskSession.Status.PARSED, VoiceTaskSession.Status.FAILED}: + return voice_session, None + + if voice_session.status != VoiceTaskSession.Status.QUEUED: + return voice_session, None + + if not voice_session.audio_file: + return voice_session, None + + with connection.cursor() as cursor: + cursor.execute("SELECT pg_advisory_xact_lock(%s)", [get_voice_task_workspace_lock_key(voice_session.workspace_id)]) + + ai_settings = WorkspaceAISettings.objects.select_for_update().filter(workspace=voice_session.workspace).first() + if not ai_settings: + now = timezone.now() + voice_session.status = VoiceTaskSession.Status.FAILED + voice_session.failed_at = now + voice_session.error_code = "not_configured" + voice_session.error_message = "Voice Tasker is not configured for this workspace." + voice_session.save(update_fields=["status", "failed_at", "error_code", "error_message", "updated_at"]) + return voice_session, None + + concurrency_error = get_voice_task_concurrency_error(voice_session.workspace, ai_settings) + if concurrency_error: + voice_session.save(update_fields=["updated_at"]) + return voice_session, { + "retry": True, + "code": concurrency_error["code"], + "message": concurrency_error["message"], + "countdown": concurrency_error["retry_after"], + } + + voice_session.status = VoiceTaskSession.Status.PROCESSING + voice_session.processing_started_at = timezone.now() + voice_session.error_code = "" + voice_session.error_message = "" + voice_session.save( + update_fields=[ + "status", + "processing_started_at", + "error_code", + "error_message", + "updated_at", + ] + ) + return voice_session, None + + +def requeue_voice_task_session(voice_session, retry_error, countdown=VOICE_TASK_OPENAI_RETRY_AFTER_SECONDS): + voice_session.status = VoiceTaskSession.Status.QUEUED + voice_session.error_code = retry_error.code + voice_session.error_message = retry_error.message + voice_session.save(update_fields=["status", "error_code", "error_message", "updated_at"]) + return { + "retry": True, + "code": retry_error.code, + "message": retry_error.message, + "countdown": countdown, + } + + +def fail_voice_task_session(voice_session_id, code, message): + voice_session = VoiceTaskSession.objects.filter(id=voice_session_id).first() + if not voice_session or voice_session.status in {VoiceTaskSession.Status.PARSED, VoiceTaskSession.Status.FAILED}: + return None + + now = timezone.now() + voice_session.status = VoiceTaskSession.Status.FAILED + voice_session.failed_at = now + voice_session.processing_duration_ms = get_voice_task_duration_ms(voice_session.processing_started_at, now) + voice_session.error_code = code + voice_session.error_message = message + voice_session.save( + update_fields=[ + "status", + "failed_at", + "processing_duration_ms", + "error_code", + "error_message", + "updated_at", + ] + ) + try: + clear_voice_task_audio_file(voice_session) + except Exception as exc: + log_exception(exc) + return voice_session + + def process_voice_task_session_pipeline(voice_session_id): - voice_session = VoiceTaskSession.objects.select_related("workspace", "user", "project").filter( - id=voice_session_id - ).first() + voice_session, queue_retry = claim_voice_task_processing_slot(voice_session_id) if not voice_session: - return + return {"ok": False, "code": "voice_session_not_found"} - if voice_session.status == VoiceTaskSession.Status.PARSED: - return + if queue_retry: + return queue_retry + if voice_session.status in {VoiceTaskSession.Status.PARSED, VoiceTaskSession.Status.FAILED}: + return {"ok": True, "status": voice_session.status} + + if voice_session.status == VoiceTaskSession.Status.QUEUED and not voice_session.audio_file: + fail_voice_task_session( + voice_session.id, + "missing_audio", + "Voice Tasker audio file is not available for processing.", + ) + return {"ok": False, "code": "missing_audio"} + + if voice_session.status != VoiceTaskSession.Status.PROCESSING: + return {"ok": True, "status": voice_session.status} + + should_clear_audio_file = False try: if not voice_session.audio_file: raise VoiceTaskerPipelineError( @@ -2692,10 +2838,6 @@ def process_voice_task_session_pipeline(voice_session_id): client_context = voice_session.client_context or {} ai_settings, api_key = get_workspace_ai_runtime(workspace) - voice_session.status = VoiceTaskSession.Status.PROCESSING - voice_session.processing_started_at = timezone.now() - voice_session.save(update_fields=["status", "processing_started_at", "updated_at"]) - voice_session.status = VoiceTaskSession.Status.TRANSCRIBING voice_session.save(update_fields=["status", "updated_at"]) @@ -2794,8 +2936,16 @@ def process_voice_task_session_pipeline(voice_session_id): "updated_at", ] ) + should_clear_audio_file = True except VoiceTaskerPipelineError as exc: pipeline_error = exc + if pipeline_error.code in VOICE_TASK_TRANSIENT_OPENAI_ERROR_CODES: + return requeue_voice_task_session( + voice_session, + pipeline_error, + countdown=VOICE_TASK_OPENAI_RETRY_AFTER_SECONDS, + ) + voice_session.status = VoiceTaskSession.Status.FAILED voice_session.failed_at = timezone.now() voice_session.processing_duration_ms = get_voice_task_duration_ms( @@ -2814,8 +2964,16 @@ def process_voice_task_session_pipeline(voice_session_id): "updated_at", ] ) + should_clear_audio_file = True except Exception as exc: pipeline_error = get_openai_pipeline_error(exc) + if pipeline_error.code in VOICE_TASK_TRANSIENT_OPENAI_ERROR_CODES: + return requeue_voice_task_session( + voice_session, + pipeline_error, + countdown=VOICE_TASK_OPENAI_RETRY_AFTER_SECONDS, + ) + voice_session.status = VoiceTaskSession.Status.FAILED voice_session.failed_at = timezone.now() voice_session.processing_duration_ms = get_voice_task_duration_ms( @@ -2834,11 +2992,13 @@ def process_voice_task_session_pipeline(voice_session_id): "updated_at", ] ) + should_clear_audio_file = True finally: - try: - clear_voice_task_audio_file(voice_session) - except Exception as exc: - log_exception(exc) + if should_clear_audio_file: + try: + clear_voice_task_audio_file(voice_session) + except Exception as exc: + log_exception(exc) def serialize_voice_task_session_response(voice_session): diff --git a/plane-src/apps/api/plane/bgtasks/voice_tasker_task.py b/plane-src/apps/api/plane/bgtasks/voice_tasker_task.py index e44b829..c46b069 100644 --- a/plane-src/apps/api/plane/bgtasks/voice_tasker_task.py +++ b/plane-src/apps/api/plane/bgtasks/voice_tasker_task.py @@ -5,11 +5,24 @@ from celery import shared_task -@shared_task -def process_voice_task_session(voice_session_id): - from plane.app.views.voice_tasker import process_voice_task_session_pipeline +@shared_task(bind=True, max_retries=60) +def process_voice_task_session(self, voice_session_id): + from plane.app.views.voice_tasker import fail_voice_task_session, process_voice_task_session_pipeline - process_voice_task_session_pipeline(voice_session_id) + result = process_voice_task_session_pipeline(voice_session_id) or {} + if not result.get("retry"): + return result + + if self.request.retries >= self.max_retries: + fail_voice_task_session( + voice_session_id, + "voice_task_queue_timeout", + "Voice Tasker session stayed in retry queue for too long.", + ) + return {"ok": False, "code": "voice_task_queue_timeout"} + + countdown = int(result.get("countdown") or 15) + raise self.retry(countdown=countdown, exc=Exception(result.get("code") or "voice_task_retry")) @shared_task diff --git a/plane-src/apps/web/core/components/workspace/settings/ai-voice-tasker-settings.tsx b/plane-src/apps/web/core/components/workspace/settings/ai-voice-tasker-settings.tsx index 372be16..f594846 100644 --- a/plane-src/apps/web/core/components/workspace/settings/ai-voice-tasker-settings.tsx +++ b/plane-src/apps/web/core/components/workspace/settings/ai-voice-tasker-settings.tsx @@ -630,22 +630,28 @@ type TLimitUsageOverviewProps = { function LimitUsageOverview({ formState, monitor }: TLimitUsageOverviewProps) { const totalSessions = monitor?.summary.total ?? 0; - const activeSessions = monitor?.summary.active ?? 0; + const processingSessions = monitor?.concurrency.used ?? 0; + const queuedSessions = monitor?.concurrency.queued ?? monitor?.summary.status_counts?.queued ?? 0; const workspaceDailyRatio = getLimitRatio(totalSessions, formState.workspace_daily_limit); - const concurrencyRatio = getLimitRatio(activeSessions, formState.workspace_concurrency_limit); + const concurrencyRatio = getLimitRatio(processingSessions, formState.workspace_concurrency_limit); return ( -
+
+