From 3b295e33f33c12e763aac16b2adf11da2748ad0b Mon Sep 17 00:00:00 2001 From: DCCONSTRUCTIONS Date: Tue, 28 Apr 2026 15:43:01 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A4=D0=A3=D0=9D=D0=9A=D0=A6=D0=98=D0=98=20-?= =?UTF-8?q?=20=D0=9C=D0=95=D0=96=D0=9F=D0=A0=D0=9E=D0=95=D0=9A=D0=A2=D0=9D?= =?UTF-8?q?=D0=90=D0=AF=20=D0=9A=D0=9E=D0=9C=D0=9C=D0=A3=D0=9D=D0=98=D0=9A?= =?UTF-8?q?=D0=90=D0=A6=D0=98=D0=AF:=20=D0=BE=D1=87=D0=B5=D1=80=D0=B5?= =?UTF-8?q?=D0=B4=D1=8C=20Voice=20Tasker=20=D0=B8=20concurrency=20control?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/plane/app/serializers/voice_tasker.py | 6 + .../apps/api/plane/app/urls/voice_tasker.py | 6 + .../apps/api/plane/app/views/__init__.py | 1 + .../apps/api/plane/app/views/voice_tasker.py | 535 +++++++++++++----- .../api/plane/bgtasks/voice_tasker_task.py | 12 + .../0132_voice_tasker_queue_concurrency.py | 40 ++ .../apps/api/plane/db/models/voice_tasker.py | 4 + plane-src/apps/api/plane/settings/common.py | 1 + .../voice-tasker/global-control.tsx | 43 +- .../settings/ai-voice-tasker-settings.tsx | 12 + .../web/core/services/workspace-ai.service.ts | 8 + plane-src/packages/types/src/ai.ts | 6 +- 12 files changed, 530 insertions(+), 144 deletions(-) create mode 100644 plane-src/apps/api/plane/bgtasks/voice_tasker_task.py create mode 100644 plane-src/apps/api/plane/db/migrations/0132_voice_tasker_queue_concurrency.py diff --git a/plane-src/apps/api/plane/app/serializers/voice_tasker.py b/plane-src/apps/api/plane/app/serializers/voice_tasker.py index 832ec5d..2a3abf6 100644 --- a/plane-src/apps/api/plane/app/serializers/voice_tasker.py +++ b/plane-src/apps/api/plane/app/serializers/voice_tasker.py @@ -36,6 +36,7 @@ class WorkspaceAISettingsSerializer(BaseSerializer): "per_user_daily_limit", "workspace_daily_limit", "project_daily_limit", + "workspace_concurrency_limit", "credential", "openai_api_key", "created_at", @@ -123,6 +124,11 @@ class WorkspaceAISettingsSerializer(BaseSerializer): raise serializers.ValidationError("Project daily limit must be between 1 and 50000.") return value + def validate_workspace_concurrency_limit(self, value): + if value < 1 or value > 50: + raise serializers.ValidationError("Workspace concurrency limit must be between 1 and 50.") + return value + def update(self, instance, validated_data): api_key = validated_data.pop("openai_api_key", None) default_project_id = validated_data.pop("default_project_id", serializers.empty) diff --git a/plane-src/apps/api/plane/app/urls/voice_tasker.py b/plane-src/apps/api/plane/app/urls/voice_tasker.py index 39c38e2..06568aa 100644 --- a/plane-src/apps/api/plane/app/urls/voice_tasker.py +++ b/plane-src/apps/api/plane/app/urls/voice_tasker.py @@ -8,6 +8,7 @@ from plane.app.views import ( VoiceTaskCommitEndpoint, VoiceTaskParseEndpoint, VoiceTaskPreflightEndpoint, + VoiceTaskSessionEndpoint, WorkspaceAISettingsEndpoint, WorkspaceAISettingsTestConnectionEndpoint, ) @@ -34,6 +35,11 @@ urlpatterns = [ VoiceTaskParseEndpoint.as_view(), name="voice-task-parse", ), + path( + "workspaces//voice-task/sessions//", + VoiceTaskSessionEndpoint.as_view(), + name="voice-task-session", + ), path( "workspaces//voice-task/commit/", VoiceTaskCommitEndpoint.as_view(), diff --git a/plane-src/apps/api/plane/app/views/__init__.py b/plane-src/apps/api/plane/app/views/__init__.py index a41bdff..a43f1a1 100644 --- a/plane-src/apps/api/plane/app/views/__init__.py +++ b/plane-src/apps/api/plane/app/views/__init__.py @@ -253,6 +253,7 @@ from .voice_tasker import ( VoiceTaskCommitEndpoint, VoiceTaskParseEndpoint, VoiceTaskPreflightEndpoint, + VoiceTaskSessionEndpoint, WorkspaceAISettingsEndpoint, WorkspaceAISettingsTestConnectionEndpoint, ) diff --git a/plane-src/apps/api/plane/app/views/voice_tasker.py b/plane-src/apps/api/plane/app/views/voice_tasker.py index 4042b2e..8d3a427 100644 --- a/plane-src/apps/api/plane/app/views/voice_tasker.py +++ b/plane-src/apps/api/plane/app/views/voice_tasker.py @@ -7,14 +7,17 @@ import json import math import re import uuid +import zlib from datetime import date, timedelta from difflib import SequenceMatcher from html import escape from zoneinfo import ZoneInfo, ZoneInfoNotFoundError +from django.core.files.base import ContentFile from django.core.serializers.json import DjangoJSONEncoder from django.db import connection, transaction from django.db.models import Max, Q +from django.utils.text import get_valid_filename from django.utils import timezone from openai import OpenAI @@ -63,6 +66,8 @@ VOICE_TASK_MEMORY_LIMIT = 5 VOICE_TASK_CONTEXT_LIMIT = 100 VOICE_TASK_RATE_LIMIT_HOURLY_WINDOW_SECONDS = 60 * 60 VOICE_TASK_RATE_LIMIT_DAILY_WINDOW_SECONDS = 24 * 60 * 60 +VOICE_TASK_CONCURRENCY_RETRY_AFTER_SECONDS = 15 +VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS = 30 * 60 VOICE_TASK_RATE_LIMIT_ERROR_CODES = { "voice_task_user_hourly_limit_exceeded", "voice_task_workspace_hourly_limit_exceeded", @@ -70,6 +75,16 @@ VOICE_TASK_RATE_LIMIT_ERROR_CODES = { "voice_task_workspace_daily_limit_exceeded", "voice_task_project_daily_limit_exceeded", } +VOICE_TASK_CONCURRENCY_ERROR_CODES = {"voice_task_workspace_concurrency_limit_exceeded"} +VOICE_TASK_LIMIT_EXCLUDED_ERROR_CODES = VOICE_TASK_RATE_LIMIT_ERROR_CODES | VOICE_TASK_CONCURRENCY_ERROR_CODES +VOICE_TASK_ACTIVE_SESSION_STATUSES = { + VoiceTaskSession.Status.QUEUED, + VoiceTaskSession.Status.PROCESSING, + VoiceTaskSession.Status.UPLOADED, + VoiceTaskSession.Status.TRANSCRIBING, + VoiceTaskSession.Status.TRANSCRIBED, + VoiceTaskSession.Status.PARSING, +} VOICE_TASK_PROJECT_MATCH_THRESHOLD = 0.8 VOICE_TASK_ASSIGNEE_MATCH_THRESHOLD = 0.8 VOICE_TASK_STATE_MATCH_THRESHOLD = 0.8 @@ -345,7 +360,7 @@ def get_voice_task_limit_sessions(workspace, window_seconds, now=None): workspace=workspace, created_at__gte=window_start, created_at__lte=now, - ).exclude(error_code__in=VOICE_TASK_RATE_LIMIT_ERROR_CODES) + ).exclude(error_code__in=VOICE_TASK_LIMIT_EXCLUDED_ERROR_CODES) def get_voice_task_rate_limit_state(workspace, user, project, ai_settings, now=None): @@ -480,6 +495,74 @@ def get_voice_task_rate_limit_error(workspace, user, project, ai_settings, now=N return None +def get_voice_task_workspace_lock_key(workspace_id): + return zlib.crc32(f"voice-tasker:{workspace_id}".encode("utf-8")) + + +def get_voice_task_audio_filename(audio, audio_content_type): + original_name = get_valid_filename(getattr(audio, "name", "") or "") + if original_name: + return f"{uuid.uuid4()}-{original_name}" + + extension = { + "audio/webm": "webm", + "audio/mp4": "m4a", + "audio/mpeg": "mp3", + "audio/wav": "wav", + }.get(audio_content_type, "webm") + return f"{uuid.uuid4()}.{extension}" + + +def save_voice_task_audio_file(voice_session, audio, audio_content_type): + audio.seek(0) + voice_session.audio_file.save( + get_voice_task_audio_filename(audio, audio_content_type), + ContentFile(audio.read()), + save=False, + ) + + +def clear_voice_task_audio_file(voice_session): + if not voice_session.audio_file: + return + + voice_session.audio_file.delete(save=False) + voice_session.audio_file = None + voice_session.save(update_fields=["audio_file", "updated_at"]) + + +def get_voice_task_concurrency_state(workspace, ai_settings): + active_sessions = VoiceTaskSession.objects.filter( + workspace=workspace, + status__in=VOICE_TASK_ACTIVE_SESSION_STATUSES, + updated_at__gte=timezone.now() - timedelta(seconds=VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS), + ).count() + concurrency_limit = max(int(ai_settings.workspace_concurrency_limit or 0), 0) + + return { + "scope": "workspace", + "limit": concurrency_limit, + "used": active_sessions, + "exceeded": bool(concurrency_limit and active_sessions >= concurrency_limit), + "retry_after": VOICE_TASK_CONCURRENCY_RETRY_AFTER_SECONDS, + } + + +def get_voice_task_concurrency_error(workspace, ai_settings): + state = get_voice_task_concurrency_state(workspace, ai_settings) + if not state["exceeded"]: + return None + + return { + "code": "voice_task_workspace_concurrency_limit_exceeded", + "message": "Voice Tasker processing queue is full for this workspace.", + "scope": state["scope"], + "limit": state["limit"], + "used": state["used"], + "retry_after": state["retry_after"], + } + + def create_voice_task_rate_limit_session( workspace, user, @@ -504,6 +587,96 @@ def create_voice_task_rate_limit_session( ) +def create_voice_task_failed_preflight_session( + workspace, + user, + audio, + audio_content_type, + duration_seconds, + client_context, + error, + project=None, +): + return VoiceTaskSession.objects.create( + workspace=workspace, + user=user, + project=project, + status=VoiceTaskSession.Status.FAILED, + audio_duration_seconds=duration_seconds, + audio_content_type=audio_content_type, + audio_size=getattr(audio, "size", None), + client_context=client_context, + error_code=error["code"], + error_message=error["message"], + ) + + +def reserve_voice_task_session( + workspace, + user, + audio, + audio_content_type, + duration_seconds, + client_context, + request_project_id=None, +): + with transaction.atomic(): + with connection.cursor() as cursor: + cursor.execute("SELECT pg_advisory_xact_lock(%s)", [get_voice_task_workspace_lock_key(workspace.id)]) + + ai_settings = WorkspaceAISettings.objects.select_for_update().filter(workspace=workspace).first() + if not ai_settings: + raise VoiceTaskerPipelineError( + "not_configured", + "Voice Tasker is not configured for this workspace.", + status.HTTP_400_BAD_REQUEST, + ) + + quota_project = get_voice_task_quota_project(workspace, project_id=request_project_id, ai_settings=ai_settings) + rate_limit_error = get_voice_task_rate_limit_error(workspace, user, quota_project, ai_settings) + if rate_limit_error: + voice_session = create_voice_task_rate_limit_session( + workspace=workspace, + user=user, + audio=audio, + audio_content_type=audio_content_type, + duration_seconds=duration_seconds, + client_context=client_context, + rate_limit_error=rate_limit_error, + project=quota_project, + ) + return None, voice_session, rate_limit_error + + concurrency_error = get_voice_task_concurrency_error(workspace, ai_settings) + if concurrency_error: + voice_session = create_voice_task_failed_preflight_session( + workspace=workspace, + user=user, + audio=audio, + audio_content_type=audio_content_type, + duration_seconds=duration_seconds, + client_context=client_context, + error=concurrency_error, + project=quota_project, + ) + return None, voice_session, concurrency_error + + voice_session = VoiceTaskSession( + workspace=workspace, + user=user, + project=quota_project, + status=VoiceTaskSession.Status.QUEUED, + audio_duration_seconds=duration_seconds, + audio_content_type=audio_content_type, + audio_size=getattr(audio, "size", None), + client_context=client_context, + ) + save_voice_task_audio_file(voice_session, audio, audio_content_type) + voice_session.save() + + return voice_session, voice_session, None + + class VoiceTaskerPipelineError(Exception): def __init__(self, code, message, response_status=status.HTTP_400_BAD_REQUEST): self.code = code @@ -573,10 +746,11 @@ class OpenAITranscriptionService: self.client = OpenAI(api_key=api_key) self.model = model - def transcribe(self, audio, language=None): + def transcribe(self, audio, language=None, content_type=None): audio.seek(0) file_name = audio.name or "voice-task.webm" - payload = (file_name, audio.read(), normalize_audio_content_type(audio.content_type) or "audio/webm") + normalized_content_type = normalize_audio_content_type(content_type or getattr(audio, "content_type", "")) + payload = (file_name, audio.read(), normalized_content_type or "audio/webm") params = { "model": self.model, "file": payload, @@ -2473,6 +2647,167 @@ def voice_task_requires_confirmation(parsed, warnings): ) +def process_voice_task_session_pipeline(voice_session_id): + voice_session = VoiceTaskSession.objects.select_related("workspace", "user", "project").filter( + id=voice_session_id + ).first() + if not voice_session: + return + + if voice_session.status == VoiceTaskSession.Status.PARSED: + return + + try: + if not voice_session.audio_file: + raise VoiceTaskerPipelineError( + "missing_audio", + "Voice Tasker audio file is not available for processing.", + status.HTTP_400_BAD_REQUEST, + ) + + workspace = voice_session.workspace + user = voice_session.user + client_context = voice_session.client_context or {} + ai_settings, api_key = get_workspace_ai_runtime(workspace) + + voice_session.status = VoiceTaskSession.Status.PROCESSING + voice_session.save(update_fields=["status", "updated_at"]) + + voice_session.status = VoiceTaskSession.Status.TRANSCRIBING + voice_session.save(update_fields=["status", "updated_at"]) + + voice_session.audio_file.open("rb") + try: + transcript = OpenAITranscriptionService( + api_key=api_key, + model=ai_settings.transcription_model, + ).transcribe( + voice_session.audio_file, + language=get_client_language(client_context), + content_type=voice_session.audio_content_type, + ) + finally: + voice_session.audio_file.close() + + if not transcript: + raise VoiceTaskerPipelineError( + "empty_transcript", + "OpenAI returned an empty transcript.", + status.HTTP_400_BAD_REQUEST, + ) + + voice_session.status = VoiceTaskSession.Status.TRANSCRIBED + voice_session.transcript = transcript + voice_session.save(update_fields=["status", "transcript", "updated_at"]) + + parser_context = build_voice_task_parser_context( + workspace=workspace, + user=user, + transcript=transcript, + client_context=client_context, + ) + + voice_session.status = VoiceTaskSession.Status.PARSING + voice_session.save(update_fields=["status", "updated_at"]) + + parsed = VoiceTaskParserService( + api_key=api_key, + model=ai_settings.structuring_model, + ).parse(parser_context) + if not parsed.get("state_hint"): + inferred_state_hint = infer_voice_task_state_hint(transcript) + if inferred_state_hint: + parsed["state_hint"] = inferred_state_hint + parsed = harden_voice_task_intent(parsed, transcript) + + voice_session.status = VoiceTaskSession.Status.PARSED + voice_session.intent = parsed["intent"] + voice_session.parsed_json = parsed + if not voice_session.project_id and parsed.get("project_id"): + voice_session.project = get_voice_task_quota_project(workspace, project_id=parsed.get("project_id")) + voice_session.error_code = "" + voice_session.error_message = "" + voice_session.save( + update_fields=["status", "intent", "parsed_json", "project", "error_code", "error_message", "updated_at"] + ) + except VoiceTaskerPipelineError as exc: + pipeline_error = exc + voice_session.status = VoiceTaskSession.Status.FAILED + voice_session.error_code = pipeline_error.code + voice_session.error_message = pipeline_error.message + voice_session.save(update_fields=["status", "error_code", "error_message", "updated_at"]) + except Exception as exc: + pipeline_error = get_openai_pipeline_error(exc) + voice_session.status = VoiceTaskSession.Status.FAILED + voice_session.error_code = pipeline_error.code + voice_session.error_message = pipeline_error.message + voice_session.save(update_fields=["status", "error_code", "error_message", "updated_at"]) + finally: + try: + clear_voice_task_audio_file(voice_session) + except Exception as exc: + log_exception(exc) + + +def serialize_voice_task_session_response(voice_session): + base_payload = { + "ok": voice_session.status != VoiceTaskSession.Status.FAILED, + "status": voice_session.status, + "pipeline_status": voice_session.status, + "voice_session_id": str(voice_session.id), + "audio": { + "content_type": voice_session.audio_content_type, + "duration_seconds": voice_session.audio_duration_seconds, + "size": voice_session.audio_size, + }, + "client_context": voice_session.client_context, + } + + if voice_session.status == VoiceTaskSession.Status.FAILED: + base_payload.update( + { + "code": voice_session.error_code, + "error": voice_session.error_message or "Voice Tasker failed to process audio.", + } + ) + return base_payload + + if voice_session.status != VoiceTaskSession.Status.PARSED: + return base_payload + + workspace = voice_session.workspace + ai_settings = WorkspaceAISettings.objects.filter(workspace=workspace).first() + parsed = normalize_voice_task_parse(voice_session.parsed_json or {}) + transcript = voice_session.transcript + warnings = get_voice_task_warnings(parsed, transcript) + resolution = build_voice_task_resolution( + workspace=workspace, + user=voice_session.user, + ai_settings=ai_settings, + draft=parsed, + client_context=voice_session.client_context or {}, + voice_session=voice_session, + transcript=transcript, + ) + warnings = list(dict.fromkeys(warnings + resolution["warnings"])) + + base_payload.update( + { + "transcript": transcript, + "intent": parsed["intent"], + "draft": parsed, + "resolution": resolution, + "warnings": warnings, + "requires_confirmation": voice_task_requires_confirmation(parsed, warnings), + "models": { + "transcription": ai_settings.transcription_model if ai_settings else "", + "structuring": ai_settings.structuring_model if ai_settings else "", + }, + } + ) + return base_payload + + class WorkspaceAISettingsEndpoint(BaseAPIView): def get_settings(self, slug): workspace = Workspace.objects.get(slug=slug) @@ -2637,162 +2972,80 @@ class VoiceTaskParseEndpoint(BaseAPIView): if not isinstance(client_context, dict): client_context = {} - ai_settings = WorkspaceAISettings.objects.filter(workspace=workspace).first() - if not ai_settings: - return Response( - {"ok": False, "code": "not_configured", "error": "Voice Tasker is not configured for this workspace."}, - status=status.HTTP_400_BAD_REQUEST, - ) - - quota_project = get_voice_task_quota_project(workspace, project_id=request_project_id, ai_settings=ai_settings) - rate_limit_error = get_voice_task_rate_limit_error(workspace, request.user, quota_project, ai_settings) - if rate_limit_error: - voice_session = create_voice_task_rate_limit_session( + try: + voice_session, recorded_session, preflight_error = reserve_voice_task_session( workspace=workspace, user=request.user, audio=audio, audio_content_type=audio_content_type, duration_seconds=duration_seconds, client_context=client_context, - rate_limit_error=rate_limit_error, - project=quota_project, + request_project_id=request_project_id, ) + except VoiceTaskerPipelineError as exc: + return Response({"ok": False, "code": exc.code, "error": exc.message}, status=exc.response_status) + + if preflight_error: + error_payload = { + "ok": False, + "voice_session_id": str(recorded_session.id), + "code": preflight_error["code"], + "error": preflight_error["message"], + "limit_scope": preflight_error.get("scope"), + "limit_window": preflight_error.get("window"), + "limit": preflight_error.get("limit"), + "used": preflight_error.get("used"), + "window_seconds": preflight_error.get("window_seconds"), + "retry_after": preflight_error.get("retry_after"), + "project_id": preflight_error.get("project_id"), + "project_identifier": preflight_error.get("project_identifier"), + "project_name": preflight_error.get("project_name"), + } + if preflight_error.get("reset_at"): + error_payload["reset_at"] = preflight_error["reset_at"].isoformat() + return Response(error_payload, status=status.HTTP_429_TOO_MANY_REQUESTS) + + from plane.bgtasks.voice_tasker_task import process_voice_task_session + + try: + process_voice_task_session.delay(str(voice_session.id)) + except Exception as exc: + log_exception(exc) + voice_session.status = VoiceTaskSession.Status.FAILED + voice_session.error_code = "voice_task_queue_unavailable" + voice_session.error_message = "Voice Tasker queue is not available." + voice_session.save(update_fields=["status", "error_code", "error_message", "updated_at"]) + clear_voice_task_audio_file(voice_session) return Response( { "ok": False, "voice_session_id": str(voice_session.id), - "code": rate_limit_error["code"], - "error": rate_limit_error["message"], - "limit_scope": rate_limit_error["scope"], - "limit_window": rate_limit_error["window"], - "limit": rate_limit_error["limit"], - "used": rate_limit_error["used"], - "window_seconds": rate_limit_error["window_seconds"], - "retry_after": rate_limit_error["retry_after"], - "reset_at": rate_limit_error["reset_at"].isoformat(), - "project_id": rate_limit_error["project_id"], - "project_identifier": rate_limit_error["project_identifier"], - "project_name": rate_limit_error["project_name"], + "code": voice_session.error_code, + "error": voice_session.error_message, }, - status=status.HTTP_429_TOO_MANY_REQUESTS, + status=status.HTTP_503_SERVICE_UNAVAILABLE, ) - voice_session = VoiceTaskSession.objects.create( - workspace=workspace, - user=request.user, - project=quota_project, - status=VoiceTaskSession.Status.UPLOADED, - audio_duration_seconds=duration_seconds, - audio_content_type=audio_content_type, - audio_size=audio.size, - client_context=client_context, + return Response(serialize_voice_task_session_response(voice_session), status=status.HTTP_202_ACCEPTED) + + +class VoiceTaskSessionEndpoint(BaseAPIView): + @allow_permission(allowed_roles=[ROLE.ADMIN, ROLE.MEMBER, ROLE.GUEST], level="WORKSPACE") + def get(self, request, slug, session_id): + workspace = Workspace.objects.get(slug=slug) + voice_session = ( + VoiceTaskSession.objects.select_related("workspace", "user", "project") + .filter(workspace=workspace, user=request.user, id=session_id) + .first() ) - - try: - ai_settings, api_key = get_workspace_ai_runtime(workspace) - - voice_session.status = VoiceTaskSession.Status.TRANSCRIBING - voice_session.save(update_fields=["status", "updated_at"]) - - transcript = OpenAITranscriptionService( - api_key=api_key, - model=ai_settings.transcription_model, - ).transcribe(audio, language=get_client_language(client_context)) - - if not transcript: - raise VoiceTaskerPipelineError( - "empty_transcript", - "OpenAI returned an empty transcript.", - status.HTTP_400_BAD_REQUEST, - ) - - voice_session.status = VoiceTaskSession.Status.TRANSCRIBED - voice_session.transcript = transcript - voice_session.save(update_fields=["status", "transcript", "updated_at"]) - - parser_context = build_voice_task_parser_context( - workspace=workspace, - user=request.user, - transcript=transcript, - client_context=client_context, - ) - - voice_session.status = VoiceTaskSession.Status.PARSING - voice_session.save(update_fields=["status", "updated_at"]) - - parsed = VoiceTaskParserService( - api_key=api_key, - model=ai_settings.structuring_model, - ).parse(parser_context) - if not parsed.get("state_hint"): - inferred_state_hint = infer_voice_task_state_hint(transcript) - if inferred_state_hint: - parsed["state_hint"] = inferred_state_hint - parsed = harden_voice_task_intent(parsed, transcript) - - warnings = get_voice_task_warnings(parsed, transcript) - resolution = build_voice_task_resolution( - workspace=workspace, - user=request.user, - ai_settings=ai_settings, - draft=parsed, - client_context=client_context, - transcript=transcript, - ) - warnings = list(dict.fromkeys(warnings + resolution["warnings"])) - requires_confirmation = voice_task_requires_confirmation(parsed, warnings) - - voice_session.status = VoiceTaskSession.Status.PARSED - voice_session.intent = parsed["intent"] - voice_session.parsed_json = parsed - if not voice_session.project_id and parsed.get("project_id"): - voice_session.project = get_voice_task_quota_project(workspace, project_id=parsed.get("project_id")) - voice_session.save(update_fields=["status", "intent", "parsed_json", "project", "updated_at"]) - + if not voice_session: return Response( - { - "ok": True, - "status": "parsed", - "pipeline_status": "parsed", - "voice_session_id": str(voice_session.id), - "transcript": transcript, - "intent": parsed["intent"], - "draft": parsed, - "resolution": resolution, - "warnings": warnings, - "requires_confirmation": requires_confirmation, - "models": { - "transcription": ai_settings.transcription_model, - "structuring": ai_settings.structuring_model, - }, - "audio": { - "content_type": audio_content_type, - "duration_seconds": duration_seconds, - "size": audio.size, - }, - "client_context": client_context, - }, - status=status.HTTP_200_OK, + {"ok": False, "code": "voice_session_not_found", "error": "Voice Task session was not found."}, + status=status.HTTP_404_NOT_FOUND, ) - except VoiceTaskerPipelineError as exc: - pipeline_error = exc - except Exception as exc: - pipeline_error = get_openai_pipeline_error(exc) - voice_session.status = VoiceTaskSession.Status.FAILED - voice_session.error_code = pipeline_error.code - voice_session.error_message = pipeline_error.message - voice_session.save(update_fields=["status", "error_code", "error_message", "updated_at"]) - - return Response( - { - "ok": False, - "voice_session_id": str(voice_session.id), - "code": pipeline_error.code, - "error": pipeline_error.message, - }, - status=pipeline_error.response_status, - ) + response_status = status.HTTP_400_BAD_REQUEST if voice_session.status == VoiceTaskSession.Status.FAILED else status.HTTP_200_OK + return Response(serialize_voice_task_session_response(voice_session), status=response_status) class VoiceTaskCommitEndpoint(BaseAPIView): diff --git a/plane-src/apps/api/plane/bgtasks/voice_tasker_task.py b/plane-src/apps/api/plane/bgtasks/voice_tasker_task.py new file mode 100644 index 0000000..02c3eed --- /dev/null +++ b/plane-src/apps/api/plane/bgtasks/voice_tasker_task.py @@ -0,0 +1,12 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + +from celery import shared_task + + +@shared_task +def process_voice_task_session(voice_session_id): + from plane.app.views.voice_tasker import process_voice_task_session_pipeline + + process_voice_task_session_pipeline(voice_session_id) diff --git a/plane-src/apps/api/plane/db/migrations/0132_voice_tasker_queue_concurrency.py b/plane-src/apps/api/plane/db/migrations/0132_voice_tasker_queue_concurrency.py new file mode 100644 index 0000000..c783e1f --- /dev/null +++ b/plane-src/apps/api/plane/db/migrations/0132_voice_tasker_queue_concurrency.py @@ -0,0 +1,40 @@ +# Generated for NODEDC Voice Tasker queue lifecycle. + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("db", "0131_voice_tasker_daily_project_limits"), + ] + + operations = [ + migrations.AddField( + model_name="workspaceaisettings", + name="workspace_concurrency_limit", + field=models.PositiveIntegerField(default=3), + ), + migrations.AddField( + model_name="voicetasksession", + name="audio_file", + field=models.FileField(blank=True, null=True, upload_to="voice-task-sessions/%Y/%m/%d/"), + ), + migrations.AlterField( + model_name="voicetasksession", + name="status", + field=models.CharField( + choices=[ + ("queued", "Queued"), + ("processing", "Processing"), + ("uploaded", "Uploaded"), + ("transcribing", "Transcribing"), + ("transcribed", "Transcribed"), + ("parsing", "Parsing"), + ("parsed", "Parsed"), + ("failed", "Failed"), + ], + default="uploaded", + max_length=32, + ), + ), + ] diff --git a/plane-src/apps/api/plane/db/models/voice_tasker.py b/plane-src/apps/api/plane/db/models/voice_tasker.py index 32f4e27..05adc78 100644 --- a/plane-src/apps/api/plane/db/models/voice_tasker.py +++ b/plane-src/apps/api/plane/db/models/voice_tasker.py @@ -55,6 +55,7 @@ class WorkspaceAISettings(BaseModel): per_user_daily_limit = models.PositiveIntegerField(default=100) workspace_daily_limit = models.PositiveIntegerField(default=1000) project_daily_limit = models.PositiveIntegerField(default=300) + workspace_concurrency_limit = models.PositiveIntegerField(default=3) class Meta: verbose_name = "Workspace AI Settings" @@ -92,6 +93,8 @@ class WorkspaceAICredential(BaseModel): class VoiceTaskSession(BaseModel): class Status(models.TextChoices): + QUEUED = "queued", "Queued" + PROCESSING = "processing", "Processing" UPLOADED = "uploaded", "Uploaded" TRANSCRIBING = "transcribing", "Transcribing" TRANSCRIBED = "transcribed", "Transcribed" @@ -117,6 +120,7 @@ class VoiceTaskSession(BaseModel): related_name="voice_task_sessions", ) status = models.CharField(max_length=32, choices=Status.choices, default=Status.UPLOADED) + audio_file = models.FileField(upload_to="voice-task-sessions/%Y/%m/%d/", null=True, blank=True) audio_duration_seconds = models.FloatField(null=True, blank=True) audio_content_type = models.CharField(max_length=100, blank=True) audio_size = models.PositiveIntegerField(null=True, blank=True) diff --git a/plane-src/apps/api/plane/settings/common.py b/plane-src/apps/api/plane/settings/common.py index 4ceaa20..952efe3 100644 --- a/plane-src/apps/api/plane/settings/common.py +++ b/plane-src/apps/api/plane/settings/common.py @@ -302,6 +302,7 @@ CELERY_IMPORTS = ( # issue version tasks "plane.bgtasks.issue_version_sync", "plane.bgtasks.issue_description_version_sync", + "plane.bgtasks.voice_tasker_task", ) FILE_SIZE_LIMIT = int(os.environ.get("FILE_SIZE_LIMIT", 5242880)) diff --git a/plane-src/apps/web/core/components/voice-tasker/global-control.tsx b/plane-src/apps/web/core/components/voice-tasker/global-control.tsx index 353220a..ca6c925 100644 --- a/plane-src/apps/web/core/components/voice-tasker/global-control.tsx +++ b/plane-src/apps/web/core/components/voice-tasker/global-control.tsx @@ -64,6 +64,8 @@ const workspaceAIService = new WorkspaceAIService(); const projectService = new ProjectService(); type TVoiceTaskerStatus = "idle" | "recording" | "uploading" | "success" | "committing" | "committed" | "error"; +const VOICE_TASK_SESSION_POLL_INTERVAL_MS = 1200; +const VOICE_TASK_SESSION_POLL_LIMIT = 250; const UNAVAILABLE_LABELS = { disabled: "AI-функции не активированы для этого workspace", @@ -158,6 +160,10 @@ function getVoiceTaskErrorMessage(error: unknown, fallback: string) { return `Суточный лимит Voice Tasker${projectName} исчерпан. Повторите ${retryAfter}.${usageText}`; } + if (code === "voice_task_workspace_concurrency_limit_exceeded") { + return `Очередь Voice Tasker занята. Повторите ${retryAfter}.`; + } + if ("error" in error && error.error) return String(error.error); } @@ -211,6 +217,18 @@ function getVoiceTaskWarnings(result: TVoiceTaskUploadResult) { ); } +function isVoiceTaskParsed(result: TVoiceTaskUploadResult) { + return result.status === "parsed" || result.pipeline_status === "parsed"; +} + +function isVoiceTaskFailed(result: TVoiceTaskUploadResult) { + return result.status === "failed" || result.pipeline_status === "failed" || result.ok === false; +} + +function wait(milliseconds: number) { + return new Promise((resolve) => window.setTimeout(resolve, milliseconds)); +} + type TVoiceTaskResolutionWithAssignees = NonNullable & { assignees?: NonNullable["assignee"][]; }; @@ -1029,6 +1047,24 @@ export function VoiceTaskerGlobalControl({ workspaceSlug }: Props) { } }; + const pollVoiceTaskSession = useCallback( + async (sessionId: string) => { + for (let attempt = 0; attempt < VOICE_TASK_SESSION_POLL_LIMIT; attempt += 1) { + await wait(VOICE_TASK_SESSION_POLL_INTERVAL_MS); + const result = await workspaceAIService.retrieveVoiceTaskSession(workspaceSlug, sessionId); + + if (isVoiceTaskFailed(result)) throw result; + if (isVoiceTaskParsed(result)) return result; + } + + throw { + code: "voice_task_poll_timeout", + error: "Voice Tasker слишком долго формирует карточку. Попробуйте еще раз.", + }; + }, + [workspaceSlug] + ); + const uploadAudio = async () => { if (!audioBlob) return; @@ -1054,8 +1090,13 @@ export function VoiceTaskerGlobalControl({ workspaceSlug }: Props) { ); try { + const queuedResult = await workspaceAIService.uploadVoiceTaskAudio(workspaceSlug, formData); const result = hydrateVoiceTaskDraftFromResolution( - await workspaceAIService.uploadVoiceTaskAudio(workspaceSlug, formData) + isVoiceTaskParsed(queuedResult) + ? queuedResult + : queuedResult.voice_session_id + ? await pollVoiceTaskSession(queuedResult.voice_session_id) + : queuedResult ); setParseResult(result); setSelectedTargetIssue(getTargetOptionFromResolution(result)); diff --git a/plane-src/apps/web/core/components/workspace/settings/ai-voice-tasker-settings.tsx b/plane-src/apps/web/core/components/workspace/settings/ai-voice-tasker-settings.tsx index 8c5051b..8387766 100644 --- a/plane-src/apps/web/core/components/workspace/settings/ai-voice-tasker-settings.tsx +++ b/plane-src/apps/web/core/components/workspace/settings/ai-voice-tasker-settings.tsx @@ -49,6 +49,7 @@ type TFormState = { per_user_daily_limit: number; workspace_daily_limit: number; project_daily_limit: number; + workspace_concurrency_limit: number; openai_api_key: string; }; @@ -71,6 +72,7 @@ const getInitialFormState = (settings?: TWorkspaceAISettings): TFormState => ({ per_user_daily_limit: settings?.per_user_daily_limit ?? 100, workspace_daily_limit: settings?.workspace_daily_limit ?? 1000, project_daily_limit: settings?.project_daily_limit ?? 300, + workspace_concurrency_limit: settings?.workspace_concurrency_limit ?? 3, openai_api_key: "", }); @@ -193,6 +195,7 @@ export const AIVoiceTaskerSettingsContent = observer(function AIVoiceTaskerSetti per_user_daily_limit: formState.per_user_daily_limit, workspace_daily_limit: formState.workspace_daily_limit, project_daily_limit: formState.project_daily_limit, + workspace_concurrency_limit: formState.workspace_concurrency_limit, }; if (formState.openai_api_key.trim()) payload.openai_api_key = formState.openai_api_key.trim(); @@ -410,6 +413,15 @@ export const AIVoiceTaskerSettingsContent = observer(function AIVoiceTaskerSetti onChange={(value) => updateFormValue("project_daily_limit", value)} /> + + updateFormValue("workspace_concurrency_limit", value)} + /> + diff --git a/plane-src/apps/web/core/services/workspace-ai.service.ts b/plane-src/apps/web/core/services/workspace-ai.service.ts index d3198bc..6abf5c6 100644 --- a/plane-src/apps/web/core/services/workspace-ai.service.ts +++ b/plane-src/apps/web/core/services/workspace-ai.service.ts @@ -80,6 +80,14 @@ export class WorkspaceAIService extends APIService { }); } + async retrieveVoiceTaskSession(workspaceSlug: string, sessionId: string): Promise { + return this.get(`/api/workspaces/${workspaceSlug}/voice-task/sessions/${sessionId}/`) + .then((response) => response?.data) + .catch((error) => { + throw error?.response?.data; + }); + } + async commitVoiceTask( workspaceSlug: string, data: { diff --git a/plane-src/packages/types/src/ai.ts b/plane-src/packages/types/src/ai.ts index 3d54723..a5cbb28 100644 --- a/plane-src/packages/types/src/ai.ts +++ b/plane-src/packages/types/src/ai.ts @@ -46,6 +46,7 @@ export type TWorkspaceAISettings = { per_user_daily_limit: number; workspace_daily_limit: number; project_daily_limit: number; + workspace_concurrency_limit: number; credential: TWorkspaceAICredentialStatus; created_at: string; updated_at: string; @@ -67,6 +68,7 @@ export type TWorkspaceAISettingsPayload = Partial< | "per_user_daily_limit" | "workspace_daily_limit" | "project_daily_limit" + | "workspace_concurrency_limit" > > & { openai_api_key?: string; @@ -179,8 +181,8 @@ export type TVoiceTaskResolvedAssignee = { export type TVoiceTaskUploadResult = { ok: boolean; - status?: "uploaded" | "parsed"; - pipeline_status?: "pending_openai_pipeline" | "parsed"; + status?: "queued" | "processing" | "uploaded" | "transcribing" | "transcribed" | "parsing" | "parsed" | "failed"; + pipeline_status?: "queued" | "processing" | "uploaded" | "transcribing" | "transcribed" | "parsing" | "parsed" | "failed"; voice_session_id?: string; transcript?: string; intent?: TVoiceTaskIntent;