ФУНКЦИИ - МЕЖПРОЕКТНАЯ КОММУНИКАЦИЯ: очередь Voice Tasker и concurrency control

This commit is contained in:
DCCONSTRUCTIONS 2026-04-28 15:43:01 +03:00
parent a0213db2fc
commit 3b295e33f3
12 changed files with 530 additions and 144 deletions

View File

@ -36,6 +36,7 @@ class WorkspaceAISettingsSerializer(BaseSerializer):
"per_user_daily_limit",
"workspace_daily_limit",
"project_daily_limit",
"workspace_concurrency_limit",
"credential",
"openai_api_key",
"created_at",
@ -123,6 +124,11 @@ class WorkspaceAISettingsSerializer(BaseSerializer):
raise serializers.ValidationError("Project daily limit must be between 1 and 50000.")
return value
def validate_workspace_concurrency_limit(self, value):
if value < 1 or value > 50:
raise serializers.ValidationError("Workspace concurrency limit must be between 1 and 50.")
return value
def update(self, instance, validated_data):
api_key = validated_data.pop("openai_api_key", None)
default_project_id = validated_data.pop("default_project_id", serializers.empty)

View File

@ -8,6 +8,7 @@ from plane.app.views import (
VoiceTaskCommitEndpoint,
VoiceTaskParseEndpoint,
VoiceTaskPreflightEndpoint,
VoiceTaskSessionEndpoint,
WorkspaceAISettingsEndpoint,
WorkspaceAISettingsTestConnectionEndpoint,
)
@ -34,6 +35,11 @@ urlpatterns = [
VoiceTaskParseEndpoint.as_view(),
name="voice-task-parse",
),
path(
"workspaces/<str:slug>/voice-task/sessions/<uuid:session_id>/",
VoiceTaskSessionEndpoint.as_view(),
name="voice-task-session",
),
path(
"workspaces/<str:slug>/voice-task/commit/",
VoiceTaskCommitEndpoint.as_view(),

View File

@ -253,6 +253,7 @@ from .voice_tasker import (
VoiceTaskCommitEndpoint,
VoiceTaskParseEndpoint,
VoiceTaskPreflightEndpoint,
VoiceTaskSessionEndpoint,
WorkspaceAISettingsEndpoint,
WorkspaceAISettingsTestConnectionEndpoint,
)

View File

@ -7,14 +7,17 @@ import json
import math
import re
import uuid
import zlib
from datetime import date, timedelta
from difflib import SequenceMatcher
from html import escape
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from django.core.files.base import ContentFile
from django.core.serializers.json import DjangoJSONEncoder
from django.db import connection, transaction
from django.db.models import Max, Q
from django.utils.text import get_valid_filename
from django.utils import timezone
from openai import OpenAI
@ -63,6 +66,8 @@ VOICE_TASK_MEMORY_LIMIT = 5
VOICE_TASK_CONTEXT_LIMIT = 100
VOICE_TASK_RATE_LIMIT_HOURLY_WINDOW_SECONDS = 60 * 60
VOICE_TASK_RATE_LIMIT_DAILY_WINDOW_SECONDS = 24 * 60 * 60
VOICE_TASK_CONCURRENCY_RETRY_AFTER_SECONDS = 15
VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS = 30 * 60
VOICE_TASK_RATE_LIMIT_ERROR_CODES = {
"voice_task_user_hourly_limit_exceeded",
"voice_task_workspace_hourly_limit_exceeded",
@ -70,6 +75,16 @@ VOICE_TASK_RATE_LIMIT_ERROR_CODES = {
"voice_task_workspace_daily_limit_exceeded",
"voice_task_project_daily_limit_exceeded",
}
VOICE_TASK_CONCURRENCY_ERROR_CODES = {"voice_task_workspace_concurrency_limit_exceeded"}
VOICE_TASK_LIMIT_EXCLUDED_ERROR_CODES = VOICE_TASK_RATE_LIMIT_ERROR_CODES | VOICE_TASK_CONCURRENCY_ERROR_CODES
VOICE_TASK_ACTIVE_SESSION_STATUSES = {
VoiceTaskSession.Status.QUEUED,
VoiceTaskSession.Status.PROCESSING,
VoiceTaskSession.Status.UPLOADED,
VoiceTaskSession.Status.TRANSCRIBING,
VoiceTaskSession.Status.TRANSCRIBED,
VoiceTaskSession.Status.PARSING,
}
VOICE_TASK_PROJECT_MATCH_THRESHOLD = 0.8
VOICE_TASK_ASSIGNEE_MATCH_THRESHOLD = 0.8
VOICE_TASK_STATE_MATCH_THRESHOLD = 0.8
@ -345,7 +360,7 @@ def get_voice_task_limit_sessions(workspace, window_seconds, now=None):
workspace=workspace,
created_at__gte=window_start,
created_at__lte=now,
).exclude(error_code__in=VOICE_TASK_RATE_LIMIT_ERROR_CODES)
).exclude(error_code__in=VOICE_TASK_LIMIT_EXCLUDED_ERROR_CODES)
def get_voice_task_rate_limit_state(workspace, user, project, ai_settings, now=None):
@ -480,6 +495,74 @@ def get_voice_task_rate_limit_error(workspace, user, project, ai_settings, now=N
return None
def get_voice_task_workspace_lock_key(workspace_id):
return zlib.crc32(f"voice-tasker:{workspace_id}".encode("utf-8"))
def get_voice_task_audio_filename(audio, audio_content_type):
original_name = get_valid_filename(getattr(audio, "name", "") or "")
if original_name:
return f"{uuid.uuid4()}-{original_name}"
extension = {
"audio/webm": "webm",
"audio/mp4": "m4a",
"audio/mpeg": "mp3",
"audio/wav": "wav",
}.get(audio_content_type, "webm")
return f"{uuid.uuid4()}.{extension}"
def save_voice_task_audio_file(voice_session, audio, audio_content_type):
audio.seek(0)
voice_session.audio_file.save(
get_voice_task_audio_filename(audio, audio_content_type),
ContentFile(audio.read()),
save=False,
)
def clear_voice_task_audio_file(voice_session):
if not voice_session.audio_file:
return
voice_session.audio_file.delete(save=False)
voice_session.audio_file = None
voice_session.save(update_fields=["audio_file", "updated_at"])
def get_voice_task_concurrency_state(workspace, ai_settings):
active_sessions = VoiceTaskSession.objects.filter(
workspace=workspace,
status__in=VOICE_TASK_ACTIVE_SESSION_STATUSES,
updated_at__gte=timezone.now() - timedelta(seconds=VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS),
).count()
concurrency_limit = max(int(ai_settings.workspace_concurrency_limit or 0), 0)
return {
"scope": "workspace",
"limit": concurrency_limit,
"used": active_sessions,
"exceeded": bool(concurrency_limit and active_sessions >= concurrency_limit),
"retry_after": VOICE_TASK_CONCURRENCY_RETRY_AFTER_SECONDS,
}
def get_voice_task_concurrency_error(workspace, ai_settings):
state = get_voice_task_concurrency_state(workspace, ai_settings)
if not state["exceeded"]:
return None
return {
"code": "voice_task_workspace_concurrency_limit_exceeded",
"message": "Voice Tasker processing queue is full for this workspace.",
"scope": state["scope"],
"limit": state["limit"],
"used": state["used"],
"retry_after": state["retry_after"],
}
def create_voice_task_rate_limit_session(
workspace,
user,
@ -504,6 +587,96 @@ def create_voice_task_rate_limit_session(
)
def create_voice_task_failed_preflight_session(
workspace,
user,
audio,
audio_content_type,
duration_seconds,
client_context,
error,
project=None,
):
return VoiceTaskSession.objects.create(
workspace=workspace,
user=user,
project=project,
status=VoiceTaskSession.Status.FAILED,
audio_duration_seconds=duration_seconds,
audio_content_type=audio_content_type,
audio_size=getattr(audio, "size", None),
client_context=client_context,
error_code=error["code"],
error_message=error["message"],
)
def reserve_voice_task_session(
workspace,
user,
audio,
audio_content_type,
duration_seconds,
client_context,
request_project_id=None,
):
with transaction.atomic():
with connection.cursor() as cursor:
cursor.execute("SELECT pg_advisory_xact_lock(%s)", [get_voice_task_workspace_lock_key(workspace.id)])
ai_settings = WorkspaceAISettings.objects.select_for_update().filter(workspace=workspace).first()
if not ai_settings:
raise VoiceTaskerPipelineError(
"not_configured",
"Voice Tasker is not configured for this workspace.",
status.HTTP_400_BAD_REQUEST,
)
quota_project = get_voice_task_quota_project(workspace, project_id=request_project_id, ai_settings=ai_settings)
rate_limit_error = get_voice_task_rate_limit_error(workspace, user, quota_project, ai_settings)
if rate_limit_error:
voice_session = create_voice_task_rate_limit_session(
workspace=workspace,
user=user,
audio=audio,
audio_content_type=audio_content_type,
duration_seconds=duration_seconds,
client_context=client_context,
rate_limit_error=rate_limit_error,
project=quota_project,
)
return None, voice_session, rate_limit_error
concurrency_error = get_voice_task_concurrency_error(workspace, ai_settings)
if concurrency_error:
voice_session = create_voice_task_failed_preflight_session(
workspace=workspace,
user=user,
audio=audio,
audio_content_type=audio_content_type,
duration_seconds=duration_seconds,
client_context=client_context,
error=concurrency_error,
project=quota_project,
)
return None, voice_session, concurrency_error
voice_session = VoiceTaskSession(
workspace=workspace,
user=user,
project=quota_project,
status=VoiceTaskSession.Status.QUEUED,
audio_duration_seconds=duration_seconds,
audio_content_type=audio_content_type,
audio_size=getattr(audio, "size", None),
client_context=client_context,
)
save_voice_task_audio_file(voice_session, audio, audio_content_type)
voice_session.save()
return voice_session, voice_session, None
class VoiceTaskerPipelineError(Exception):
def __init__(self, code, message, response_status=status.HTTP_400_BAD_REQUEST):
self.code = code
@ -573,10 +746,11 @@ class OpenAITranscriptionService:
self.client = OpenAI(api_key=api_key)
self.model = model
def transcribe(self, audio, language=None):
def transcribe(self, audio, language=None, content_type=None):
audio.seek(0)
file_name = audio.name or "voice-task.webm"
payload = (file_name, audio.read(), normalize_audio_content_type(audio.content_type) or "audio/webm")
normalized_content_type = normalize_audio_content_type(content_type or getattr(audio, "content_type", ""))
payload = (file_name, audio.read(), normalized_content_type or "audio/webm")
params = {
"model": self.model,
"file": payload,
@ -2473,6 +2647,167 @@ def voice_task_requires_confirmation(parsed, warnings):
)
def process_voice_task_session_pipeline(voice_session_id):
voice_session = VoiceTaskSession.objects.select_related("workspace", "user", "project").filter(
id=voice_session_id
).first()
if not voice_session:
return
if voice_session.status == VoiceTaskSession.Status.PARSED:
return
try:
if not voice_session.audio_file:
raise VoiceTaskerPipelineError(
"missing_audio",
"Voice Tasker audio file is not available for processing.",
status.HTTP_400_BAD_REQUEST,
)
workspace = voice_session.workspace
user = voice_session.user
client_context = voice_session.client_context or {}
ai_settings, api_key = get_workspace_ai_runtime(workspace)
voice_session.status = VoiceTaskSession.Status.PROCESSING
voice_session.save(update_fields=["status", "updated_at"])
voice_session.status = VoiceTaskSession.Status.TRANSCRIBING
voice_session.save(update_fields=["status", "updated_at"])
voice_session.audio_file.open("rb")
try:
transcript = OpenAITranscriptionService(
api_key=api_key,
model=ai_settings.transcription_model,
).transcribe(
voice_session.audio_file,
language=get_client_language(client_context),
content_type=voice_session.audio_content_type,
)
finally:
voice_session.audio_file.close()
if not transcript:
raise VoiceTaskerPipelineError(
"empty_transcript",
"OpenAI returned an empty transcript.",
status.HTTP_400_BAD_REQUEST,
)
voice_session.status = VoiceTaskSession.Status.TRANSCRIBED
voice_session.transcript = transcript
voice_session.save(update_fields=["status", "transcript", "updated_at"])
parser_context = build_voice_task_parser_context(
workspace=workspace,
user=user,
transcript=transcript,
client_context=client_context,
)
voice_session.status = VoiceTaskSession.Status.PARSING
voice_session.save(update_fields=["status", "updated_at"])
parsed = VoiceTaskParserService(
api_key=api_key,
model=ai_settings.structuring_model,
).parse(parser_context)
if not parsed.get("state_hint"):
inferred_state_hint = infer_voice_task_state_hint(transcript)
if inferred_state_hint:
parsed["state_hint"] = inferred_state_hint
parsed = harden_voice_task_intent(parsed, transcript)
voice_session.status = VoiceTaskSession.Status.PARSED
voice_session.intent = parsed["intent"]
voice_session.parsed_json = parsed
if not voice_session.project_id and parsed.get("project_id"):
voice_session.project = get_voice_task_quota_project(workspace, project_id=parsed.get("project_id"))
voice_session.error_code = ""
voice_session.error_message = ""
voice_session.save(
update_fields=["status", "intent", "parsed_json", "project", "error_code", "error_message", "updated_at"]
)
except VoiceTaskerPipelineError as exc:
pipeline_error = exc
voice_session.status = VoiceTaskSession.Status.FAILED
voice_session.error_code = pipeline_error.code
voice_session.error_message = pipeline_error.message
voice_session.save(update_fields=["status", "error_code", "error_message", "updated_at"])
except Exception as exc:
pipeline_error = get_openai_pipeline_error(exc)
voice_session.status = VoiceTaskSession.Status.FAILED
voice_session.error_code = pipeline_error.code
voice_session.error_message = pipeline_error.message
voice_session.save(update_fields=["status", "error_code", "error_message", "updated_at"])
finally:
try:
clear_voice_task_audio_file(voice_session)
except Exception as exc:
log_exception(exc)
def serialize_voice_task_session_response(voice_session):
base_payload = {
"ok": voice_session.status != VoiceTaskSession.Status.FAILED,
"status": voice_session.status,
"pipeline_status": voice_session.status,
"voice_session_id": str(voice_session.id),
"audio": {
"content_type": voice_session.audio_content_type,
"duration_seconds": voice_session.audio_duration_seconds,
"size": voice_session.audio_size,
},
"client_context": voice_session.client_context,
}
if voice_session.status == VoiceTaskSession.Status.FAILED:
base_payload.update(
{
"code": voice_session.error_code,
"error": voice_session.error_message or "Voice Tasker failed to process audio.",
}
)
return base_payload
if voice_session.status != VoiceTaskSession.Status.PARSED:
return base_payload
workspace = voice_session.workspace
ai_settings = WorkspaceAISettings.objects.filter(workspace=workspace).first()
parsed = normalize_voice_task_parse(voice_session.parsed_json or {})
transcript = voice_session.transcript
warnings = get_voice_task_warnings(parsed, transcript)
resolution = build_voice_task_resolution(
workspace=workspace,
user=voice_session.user,
ai_settings=ai_settings,
draft=parsed,
client_context=voice_session.client_context or {},
voice_session=voice_session,
transcript=transcript,
)
warnings = list(dict.fromkeys(warnings + resolution["warnings"]))
base_payload.update(
{
"transcript": transcript,
"intent": parsed["intent"],
"draft": parsed,
"resolution": resolution,
"warnings": warnings,
"requires_confirmation": voice_task_requires_confirmation(parsed, warnings),
"models": {
"transcription": ai_settings.transcription_model if ai_settings else "",
"structuring": ai_settings.structuring_model if ai_settings else "",
},
}
)
return base_payload
class WorkspaceAISettingsEndpoint(BaseAPIView):
def get_settings(self, slug):
workspace = Workspace.objects.get(slug=slug)
@ -2637,163 +2972,81 @@ class VoiceTaskParseEndpoint(BaseAPIView):
if not isinstance(client_context, dict):
client_context = {}
ai_settings = WorkspaceAISettings.objects.filter(workspace=workspace).first()
if not ai_settings:
return Response(
{"ok": False, "code": "not_configured", "error": "Voice Tasker is not configured for this workspace."},
status=status.HTTP_400_BAD_REQUEST,
)
quota_project = get_voice_task_quota_project(workspace, project_id=request_project_id, ai_settings=ai_settings)
rate_limit_error = get_voice_task_rate_limit_error(workspace, request.user, quota_project, ai_settings)
if rate_limit_error:
voice_session = create_voice_task_rate_limit_session(
try:
voice_session, recorded_session, preflight_error = reserve_voice_task_session(
workspace=workspace,
user=request.user,
audio=audio,
audio_content_type=audio_content_type,
duration_seconds=duration_seconds,
client_context=client_context,
rate_limit_error=rate_limit_error,
project=quota_project,
)
return Response(
{
"ok": False,
"voice_session_id": str(voice_session.id),
"code": rate_limit_error["code"],
"error": rate_limit_error["message"],
"limit_scope": rate_limit_error["scope"],
"limit_window": rate_limit_error["window"],
"limit": rate_limit_error["limit"],
"used": rate_limit_error["used"],
"window_seconds": rate_limit_error["window_seconds"],
"retry_after": rate_limit_error["retry_after"],
"reset_at": rate_limit_error["reset_at"].isoformat(),
"project_id": rate_limit_error["project_id"],
"project_identifier": rate_limit_error["project_identifier"],
"project_name": rate_limit_error["project_name"],
},
status=status.HTTP_429_TOO_MANY_REQUESTS,
)
voice_session = VoiceTaskSession.objects.create(
workspace=workspace,
user=request.user,
project=quota_project,
status=VoiceTaskSession.Status.UPLOADED,
audio_duration_seconds=duration_seconds,
audio_content_type=audio_content_type,
audio_size=audio.size,
client_context=client_context,
)
try:
ai_settings, api_key = get_workspace_ai_runtime(workspace)
voice_session.status = VoiceTaskSession.Status.TRANSCRIBING
voice_session.save(update_fields=["status", "updated_at"])
transcript = OpenAITranscriptionService(
api_key=api_key,
model=ai_settings.transcription_model,
).transcribe(audio, language=get_client_language(client_context))
if not transcript:
raise VoiceTaskerPipelineError(
"empty_transcript",
"OpenAI returned an empty transcript.",
status.HTTP_400_BAD_REQUEST,
)
voice_session.status = VoiceTaskSession.Status.TRANSCRIBED
voice_session.transcript = transcript
voice_session.save(update_fields=["status", "transcript", "updated_at"])
parser_context = build_voice_task_parser_context(
workspace=workspace,
user=request.user,
transcript=transcript,
client_context=client_context,
)
voice_session.status = VoiceTaskSession.Status.PARSING
voice_session.save(update_fields=["status", "updated_at"])
parsed = VoiceTaskParserService(
api_key=api_key,
model=ai_settings.structuring_model,
).parse(parser_context)
if not parsed.get("state_hint"):
inferred_state_hint = infer_voice_task_state_hint(transcript)
if inferred_state_hint:
parsed["state_hint"] = inferred_state_hint
parsed = harden_voice_task_intent(parsed, transcript)
warnings = get_voice_task_warnings(parsed, transcript)
resolution = build_voice_task_resolution(
workspace=workspace,
user=request.user,
ai_settings=ai_settings,
draft=parsed,
client_context=client_context,
transcript=transcript,
)
warnings = list(dict.fromkeys(warnings + resolution["warnings"]))
requires_confirmation = voice_task_requires_confirmation(parsed, warnings)
voice_session.status = VoiceTaskSession.Status.PARSED
voice_session.intent = parsed["intent"]
voice_session.parsed_json = parsed
if not voice_session.project_id and parsed.get("project_id"):
voice_session.project = get_voice_task_quota_project(workspace, project_id=parsed.get("project_id"))
voice_session.save(update_fields=["status", "intent", "parsed_json", "project", "updated_at"])
return Response(
{
"ok": True,
"status": "parsed",
"pipeline_status": "parsed",
"voice_session_id": str(voice_session.id),
"transcript": transcript,
"intent": parsed["intent"],
"draft": parsed,
"resolution": resolution,
"warnings": warnings,
"requires_confirmation": requires_confirmation,
"models": {
"transcription": ai_settings.transcription_model,
"structuring": ai_settings.structuring_model,
},
"audio": {
"content_type": audio_content_type,
"duration_seconds": duration_seconds,
"size": audio.size,
},
"client_context": client_context,
},
status=status.HTTP_200_OK,
request_project_id=request_project_id,
)
except VoiceTaskerPipelineError as exc:
pipeline_error = exc
return Response({"ok": False, "code": exc.code, "error": exc.message}, status=exc.response_status)
if preflight_error:
error_payload = {
"ok": False,
"voice_session_id": str(recorded_session.id),
"code": preflight_error["code"],
"error": preflight_error["message"],
"limit_scope": preflight_error.get("scope"),
"limit_window": preflight_error.get("window"),
"limit": preflight_error.get("limit"),
"used": preflight_error.get("used"),
"window_seconds": preflight_error.get("window_seconds"),
"retry_after": preflight_error.get("retry_after"),
"project_id": preflight_error.get("project_id"),
"project_identifier": preflight_error.get("project_identifier"),
"project_name": preflight_error.get("project_name"),
}
if preflight_error.get("reset_at"):
error_payload["reset_at"] = preflight_error["reset_at"].isoformat()
return Response(error_payload, status=status.HTTP_429_TOO_MANY_REQUESTS)
from plane.bgtasks.voice_tasker_task import process_voice_task_session
try:
process_voice_task_session.delay(str(voice_session.id))
except Exception as exc:
pipeline_error = get_openai_pipeline_error(exc)
log_exception(exc)
voice_session.status = VoiceTaskSession.Status.FAILED
voice_session.error_code = pipeline_error.code
voice_session.error_message = pipeline_error.message
voice_session.error_code = "voice_task_queue_unavailable"
voice_session.error_message = "Voice Tasker queue is not available."
voice_session.save(update_fields=["status", "error_code", "error_message", "updated_at"])
clear_voice_task_audio_file(voice_session)
return Response(
{
"ok": False,
"voice_session_id": str(voice_session.id),
"code": pipeline_error.code,
"error": pipeline_error.message,
"code": voice_session.error_code,
"error": voice_session.error_message,
},
status=pipeline_error.response_status,
status=status.HTTP_503_SERVICE_UNAVAILABLE,
)
return Response(serialize_voice_task_session_response(voice_session), status=status.HTTP_202_ACCEPTED)
class VoiceTaskSessionEndpoint(BaseAPIView):
@allow_permission(allowed_roles=[ROLE.ADMIN, ROLE.MEMBER, ROLE.GUEST], level="WORKSPACE")
def get(self, request, slug, session_id):
workspace = Workspace.objects.get(slug=slug)
voice_session = (
VoiceTaskSession.objects.select_related("workspace", "user", "project")
.filter(workspace=workspace, user=request.user, id=session_id)
.first()
)
if not voice_session:
return Response(
{"ok": False, "code": "voice_session_not_found", "error": "Voice Task session was not found."},
status=status.HTTP_404_NOT_FOUND,
)
response_status = status.HTTP_400_BAD_REQUEST if voice_session.status == VoiceTaskSession.Status.FAILED else status.HTTP_200_OK
return Response(serialize_voice_task_session_response(voice_session), status=response_status)
class VoiceTaskCommitEndpoint(BaseAPIView):
@allow_permission(allowed_roles=[ROLE.ADMIN, ROLE.MEMBER, ROLE.GUEST], level="WORKSPACE")

View File

@ -0,0 +1,12 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
from celery import shared_task
@shared_task
def process_voice_task_session(voice_session_id):
from plane.app.views.voice_tasker import process_voice_task_session_pipeline
process_voice_task_session_pipeline(voice_session_id)

View File

@ -0,0 +1,40 @@
# Generated for NODEDC Voice Tasker queue lifecycle.
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("db", "0131_voice_tasker_daily_project_limits"),
]
operations = [
migrations.AddField(
model_name="workspaceaisettings",
name="workspace_concurrency_limit",
field=models.PositiveIntegerField(default=3),
),
migrations.AddField(
model_name="voicetasksession",
name="audio_file",
field=models.FileField(blank=True, null=True, upload_to="voice-task-sessions/%Y/%m/%d/"),
),
migrations.AlterField(
model_name="voicetasksession",
name="status",
field=models.CharField(
choices=[
("queued", "Queued"),
("processing", "Processing"),
("uploaded", "Uploaded"),
("transcribing", "Transcribing"),
("transcribed", "Transcribed"),
("parsing", "Parsing"),
("parsed", "Parsed"),
("failed", "Failed"),
],
default="uploaded",
max_length=32,
),
),
]

View File

@ -55,6 +55,7 @@ class WorkspaceAISettings(BaseModel):
per_user_daily_limit = models.PositiveIntegerField(default=100)
workspace_daily_limit = models.PositiveIntegerField(default=1000)
project_daily_limit = models.PositiveIntegerField(default=300)
workspace_concurrency_limit = models.PositiveIntegerField(default=3)
class Meta:
verbose_name = "Workspace AI Settings"
@ -92,6 +93,8 @@ class WorkspaceAICredential(BaseModel):
class VoiceTaskSession(BaseModel):
class Status(models.TextChoices):
QUEUED = "queued", "Queued"
PROCESSING = "processing", "Processing"
UPLOADED = "uploaded", "Uploaded"
TRANSCRIBING = "transcribing", "Transcribing"
TRANSCRIBED = "transcribed", "Transcribed"
@ -117,6 +120,7 @@ class VoiceTaskSession(BaseModel):
related_name="voice_task_sessions",
)
status = models.CharField(max_length=32, choices=Status.choices, default=Status.UPLOADED)
audio_file = models.FileField(upload_to="voice-task-sessions/%Y/%m/%d/", null=True, blank=True)
audio_duration_seconds = models.FloatField(null=True, blank=True)
audio_content_type = models.CharField(max_length=100, blank=True)
audio_size = models.PositiveIntegerField(null=True, blank=True)

View File

@ -302,6 +302,7 @@ CELERY_IMPORTS = (
# issue version tasks
"plane.bgtasks.issue_version_sync",
"plane.bgtasks.issue_description_version_sync",
"plane.bgtasks.voice_tasker_task",
)
FILE_SIZE_LIMIT = int(os.environ.get("FILE_SIZE_LIMIT", 5242880))

View File

@ -64,6 +64,8 @@ const workspaceAIService = new WorkspaceAIService();
const projectService = new ProjectService();
type TVoiceTaskerStatus = "idle" | "recording" | "uploading" | "success" | "committing" | "committed" | "error";
const VOICE_TASK_SESSION_POLL_INTERVAL_MS = 1200;
const VOICE_TASK_SESSION_POLL_LIMIT = 250;
const UNAVAILABLE_LABELS = {
disabled: "AI-функции не активированы для этого workspace",
@ -158,6 +160,10 @@ function getVoiceTaskErrorMessage(error: unknown, fallback: string) {
return `Суточный лимит Voice Tasker${projectName} исчерпан. Повторите ${retryAfter}.${usageText}`;
}
if (code === "voice_task_workspace_concurrency_limit_exceeded") {
return `Очередь Voice Tasker занята. Повторите ${retryAfter}.`;
}
if ("error" in error && error.error) return String(error.error);
}
@ -211,6 +217,18 @@ function getVoiceTaskWarnings(result: TVoiceTaskUploadResult) {
);
}
function isVoiceTaskParsed(result: TVoiceTaskUploadResult) {
return result.status === "parsed" || result.pipeline_status === "parsed";
}
function isVoiceTaskFailed(result: TVoiceTaskUploadResult) {
return result.status === "failed" || result.pipeline_status === "failed" || result.ok === false;
}
function wait(milliseconds: number) {
return new Promise((resolve) => window.setTimeout(resolve, milliseconds));
}
type TVoiceTaskResolutionWithAssignees = NonNullable<TVoiceTaskUploadResult["resolution"]> & {
assignees?: NonNullable<TVoiceTaskUploadResult["resolution"]>["assignee"][];
};
@ -1029,6 +1047,24 @@ export function VoiceTaskerGlobalControl({ workspaceSlug }: Props) {
}
};
const pollVoiceTaskSession = useCallback(
async (sessionId: string) => {
for (let attempt = 0; attempt < VOICE_TASK_SESSION_POLL_LIMIT; attempt += 1) {
await wait(VOICE_TASK_SESSION_POLL_INTERVAL_MS);
const result = await workspaceAIService.retrieveVoiceTaskSession(workspaceSlug, sessionId);
if (isVoiceTaskFailed(result)) throw result;
if (isVoiceTaskParsed(result)) return result;
}
throw {
code: "voice_task_poll_timeout",
error: "Voice Tasker слишком долго формирует карточку. Попробуйте еще раз.",
};
},
[workspaceSlug]
);
const uploadAudio = async () => {
if (!audioBlob) return;
@ -1054,8 +1090,13 @@ export function VoiceTaskerGlobalControl({ workspaceSlug }: Props) {
);
try {
const queuedResult = await workspaceAIService.uploadVoiceTaskAudio(workspaceSlug, formData);
const result = hydrateVoiceTaskDraftFromResolution(
await workspaceAIService.uploadVoiceTaskAudio(workspaceSlug, formData)
isVoiceTaskParsed(queuedResult)
? queuedResult
: queuedResult.voice_session_id
? await pollVoiceTaskSession(queuedResult.voice_session_id)
: queuedResult
);
setParseResult(result);
setSelectedTargetIssue(getTargetOptionFromResolution(result));

View File

@ -49,6 +49,7 @@ type TFormState = {
per_user_daily_limit: number;
workspace_daily_limit: number;
project_daily_limit: number;
workspace_concurrency_limit: number;
openai_api_key: string;
};
@ -71,6 +72,7 @@ const getInitialFormState = (settings?: TWorkspaceAISettings): TFormState => ({
per_user_daily_limit: settings?.per_user_daily_limit ?? 100,
workspace_daily_limit: settings?.workspace_daily_limit ?? 1000,
project_daily_limit: settings?.project_daily_limit ?? 300,
workspace_concurrency_limit: settings?.workspace_concurrency_limit ?? 3,
openai_api_key: "",
});
@ -193,6 +195,7 @@ export const AIVoiceTaskerSettingsContent = observer(function AIVoiceTaskerSetti
per_user_daily_limit: formState.per_user_daily_limit,
workspace_daily_limit: formState.workspace_daily_limit,
project_daily_limit: formState.project_daily_limit,
workspace_concurrency_limit: formState.workspace_concurrency_limit,
};
if (formState.openai_api_key.trim()) payload.openai_api_key = formState.openai_api_key.trim();
@ -410,6 +413,15 @@ export const AIVoiceTaskerSettingsContent = observer(function AIVoiceTaskerSetti
onChange={(value) => updateFormValue("project_daily_limit", value)}
/>
</Field>
<Field label="Одновременные обработки workspace">
<NumberInput
value={formState.workspace_concurrency_limit}
min={1}
max={50}
suffix="job"
onChange={(value) => updateFormValue("workspace_concurrency_limit", value)}
/>
</Field>
</div>
</section>

View File

@ -80,6 +80,14 @@ export class WorkspaceAIService extends APIService {
});
}
async retrieveVoiceTaskSession(workspaceSlug: string, sessionId: string): Promise<TVoiceTaskUploadResult> {
return this.get(`/api/workspaces/${workspaceSlug}/voice-task/sessions/${sessionId}/`)
.then((response) => response?.data)
.catch((error) => {
throw error?.response?.data;
});
}
async commitVoiceTask(
workspaceSlug: string,
data: {

View File

@ -46,6 +46,7 @@ export type TWorkspaceAISettings = {
per_user_daily_limit: number;
workspace_daily_limit: number;
project_daily_limit: number;
workspace_concurrency_limit: number;
credential: TWorkspaceAICredentialStatus;
created_at: string;
updated_at: string;
@ -67,6 +68,7 @@ export type TWorkspaceAISettingsPayload = Partial<
| "per_user_daily_limit"
| "workspace_daily_limit"
| "project_daily_limit"
| "workspace_concurrency_limit"
>
> & {
openai_api_key?: string;
@ -179,8 +181,8 @@ export type TVoiceTaskResolvedAssignee = {
export type TVoiceTaskUploadResult = {
ok: boolean;
status?: "uploaded" | "parsed";
pipeline_status?: "pending_openai_pipeline" | "parsed";
status?: "queued" | "processing" | "uploaded" | "transcribing" | "transcribed" | "parsing" | "parsed" | "failed";
pipeline_status?: "queued" | "processing" | "uploaded" | "transcribing" | "transcribed" | "parsing" | "parsed" | "failed";
voice_session_id?: string;
transcript?: string;
intent?: TVoiceTaskIntent;