diff --git a/plane-src/apps/api/plane/app/serializers/voice_tasker.py b/plane-src/apps/api/plane/app/serializers/voice_tasker.py index 2a3abf6..81c09ed 100644 --- a/plane-src/apps/api/plane/app/serializers/voice_tasker.py +++ b/plane-src/apps/api/plane/app/serializers/voice_tasker.py @@ -37,6 +37,7 @@ class WorkspaceAISettingsSerializer(BaseSerializer): "workspace_daily_limit", "project_daily_limit", "workspace_concurrency_limit", + "sensitive_data_retention_days", "credential", "openai_api_key", "created_at", @@ -129,6 +130,11 @@ class WorkspaceAISettingsSerializer(BaseSerializer): raise serializers.ValidationError("Workspace concurrency limit must be between 1 and 50.") return value + def validate_sensitive_data_retention_days(self, value): + if value < 1 or value > 365: + raise serializers.ValidationError("Sensitive data retention must be between 1 and 365 days.") + return value + def update(self, instance, validated_data): api_key = validated_data.pop("openai_api_key", None) default_project_id = validated_data.pop("default_project_id", serializers.empty) diff --git a/plane-src/apps/api/plane/app/views/voice_tasker.py b/plane-src/apps/api/plane/app/views/voice_tasker.py index fc29be2..39efa90 100644 --- a/plane-src/apps/api/plane/app/views/voice_tasker.py +++ b/plane-src/apps/api/plane/app/views/voice_tasker.py @@ -70,6 +70,7 @@ VOICE_TASK_CONCURRENCY_RETRY_AFTER_SECONDS = 15 VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS = 30 * 60 VOICE_TASK_MONITOR_WINDOW_SECONDS = 24 * 60 * 60 VOICE_TASK_MONITOR_RECENT_LIMIT = 20 +VOICE_TASK_RETENTION_BATCH_SIZE = 500 VOICE_TASK_RATE_LIMIT_ERROR_CODES = { "voice_task_user_hourly_limit_exceeded", "voice_task_workspace_hourly_limit_exceeded", @@ -2962,6 +2963,7 @@ def serialize_voice_task_monitor_session(voice_session, stale_threshold=None): "parser_prompt_tokens": voice_session.parser_prompt_tokens, "parser_completion_tokens": voice_session.parser_completion_tokens, "parser_total_tokens": voice_session.parser_total_tokens, + "sensitive_data_redacted_at": voice_session.sensitive_data_redacted_at, }, "error": { "code": voice_session.error_code, @@ -3000,6 +3002,7 @@ def get_voice_task_monitor_payload(workspace): total_audio_size=Sum("audio_size"), parser_total_tokens=Sum("parser_total_tokens"), ) + redacted_count = base_sessions.filter(sensitive_data_redacted_at__isnull=False).count() error_counts = list( window_sessions.filter(status=VoiceTaskSession.Status.FAILED) .exclude(error_code="") @@ -3036,6 +3039,7 @@ def get_voice_task_monitor_payload(workspace): "total_audio_seconds": aggregate["total_audio_seconds"] or 0, "total_audio_size": aggregate["total_audio_size"] or 0, "parser_total_tokens": aggregate["parser_total_tokens"] or 0, + "redacted": redacted_count, "status_counts": status_counts, "error_counts": error_counts, }, @@ -3050,6 +3054,112 @@ def get_voice_task_monitor_payload(workspace): } +def redact_voice_task_sensitive_data(workspace=None, now=None, batch_size=VOICE_TASK_RETENTION_BATCH_SIZE): + now = now or timezone.now() + settings_queryset = WorkspaceAISettings.objects.select_related("workspace") + if workspace: + settings_queryset = settings_queryset.filter(workspace=workspace) + + redacted_count = 0 + workspace_results = [] + eligible_statuses = [VoiceTaskSession.Status.PARSED, VoiceTaskSession.Status.FAILED] + + for ai_settings in settings_queryset: + retention_days = max(int(ai_settings.sensitive_data_retention_days or 0), 1) + cutoff = now - timedelta(days=retention_days) + queryset = VoiceTaskSession.objects.filter( + workspace=ai_settings.workspace, + status__in=eligible_statuses, + updated_at__lte=cutoff, + sensitive_data_redacted_at__isnull=True, + ).filter( + Q(transcript__gt="") + | ~Q(parsed_json={}) + | ~Q(client_context={}) + ) + session_ids = list(queryset.values_list("id", flat=True)[:batch_size]) + if not session_ids: + workspace_results.append( + { + "workspace_id": str(ai_settings.workspace_id), + "workspace_slug": ai_settings.workspace.slug, + "retention_days": retention_days, + "redacted": 0, + } + ) + continue + + updated = VoiceTaskSession.objects.filter(id__in=session_ids).update( + transcript="", + parsed_json={}, + client_context={}, + sensitive_data_redacted_at=now, + ) + redacted_count += updated + workspace_results.append( + { + "workspace_id": str(ai_settings.workspace_id), + "workspace_slug": ai_settings.workspace.slug, + "retention_days": retention_days, + "redacted": updated, + } + ) + + return { + "ok": True, + "redacted_count": redacted_count, + "workspaces": workspace_results, + } + + +def cleanup_voice_task_stale_audio(workspace=None, now=None): + now = now or timezone.now() + stale_cutoff = now - timedelta(seconds=VOICE_TASK_ACTIVE_SESSION_STALE_SECONDS) + queryset = VoiceTaskSession.objects.filter( + status__in=VOICE_TASK_ACTIVE_SESSION_STATUSES, + updated_at__lt=stale_cutoff, + ) + if workspace: + queryset = queryset.filter(workspace=workspace) + + cleaned_count = 0 + for voice_session in queryset: + voice_session.status = VoiceTaskSession.Status.FAILED + voice_session.failed_at = now + voice_session.processing_duration_ms = get_voice_task_duration_ms(voice_session.processing_started_at, now) + voice_session.error_code = "voice_task_stale_session" + voice_session.error_message = "Voice Tasker session was marked as stale by retention cleanup." + voice_session.save( + update_fields=[ + "status", + "failed_at", + "processing_duration_ms", + "error_code", + "error_message", + "updated_at", + ] + ) + try: + clear_voice_task_audio_file(voice_session) + except Exception as exc: + log_exception(exc) + cleaned_count += 1 + + return cleaned_count + + +def cleanup_voice_task_sessions(workspace=None): + now = timezone.now() + cleaned_stale_count = cleanup_voice_task_stale_audio(workspace=workspace, now=now) + redaction_result = redact_voice_task_sensitive_data(workspace=workspace, now=now) + return { + "ok": True, + "cleaned_stale_count": cleaned_stale_count, + "redacted_count": redaction_result["redacted_count"], + "workspaces": redaction_result["workspaces"], + } + + class VoiceTaskMonitorEndpoint(BaseAPIView): @allow_permission(allowed_roles=[ROLE.ADMIN], level="WORKSPACE") def get(self, request, slug): @@ -3060,12 +3170,18 @@ class VoiceTaskMonitorEndpoint(BaseAPIView): def post(self, request, slug): workspace = Workspace.objects.get(slug=slug) action = request.data.get("action") - if action != "fail_stale": + if action not in {"fail_stale", "run_retention"}: return Response( {"ok": False, "code": "unsupported_action", "error": "Unsupported Voice Task monitor action."}, status=status.HTTP_400_BAD_REQUEST, ) + if action == "run_retention": + result = cleanup_voice_task_sessions(workspace=workspace) + payload = get_voice_task_monitor_payload(workspace) + payload.update(result) + return Response(payload, status=status.HTTP_200_OK) + now = timezone.now() stale_sessions = list( VoiceTaskSession.objects.filter( diff --git a/plane-src/apps/api/plane/bgtasks/voice_tasker_task.py b/plane-src/apps/api/plane/bgtasks/voice_tasker_task.py index 02c3eed..e44b829 100644 --- a/plane-src/apps/api/plane/bgtasks/voice_tasker_task.py +++ b/plane-src/apps/api/plane/bgtasks/voice_tasker_task.py @@ -10,3 +10,10 @@ def process_voice_task_session(voice_session_id): from plane.app.views.voice_tasker import process_voice_task_session_pipeline process_voice_task_session_pipeline(voice_session_id) + + +@shared_task +def cleanup_voice_task_sessions(): + from plane.app.views.voice_tasker import cleanup_voice_task_sessions as cleanup_voice_task_sessions_impl + + return cleanup_voice_task_sessions_impl() diff --git a/plane-src/apps/api/plane/celery.py b/plane-src/apps/api/plane/celery.py index 562d048..3724a33 100644 --- a/plane-src/apps/api/plane/celery.py +++ b/plane-src/apps/api/plane/celery.py @@ -73,6 +73,10 @@ app.conf.beat_schedule = { "task": "plane.bgtasks.cleanup_task.delete_webhook_logs", "schedule": crontab(hour=3, minute=30), # UTC 03:30 }, + "check-every-day-to-cleanup-voice-tasker": { + "task": "plane.bgtasks.voice_tasker_task.cleanup_voice_task_sessions", + "schedule": crontab(hour=3, minute=40), # UTC 03:40 + }, "check-every-day-to-delete-exporter-history": { "task": "plane.bgtasks.exporter_expired_task.delete_old_s3_link", "schedule": crontab(hour=3, minute=45), # UTC 03:45 diff --git a/plane-src/apps/api/plane/db/migrations/0134_voice_tasker_retention.py b/plane-src/apps/api/plane/db/migrations/0134_voice_tasker_retention.py new file mode 100644 index 0000000..2e7db8c --- /dev/null +++ b/plane-src/apps/api/plane/db/migrations/0134_voice_tasker_retention.py @@ -0,0 +1,23 @@ +# Generated by Codex on 2026-04-28 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("db", "0133_voice_tasker_observability"), + ] + + operations = [ + migrations.AddField( + model_name="workspaceaisettings", + name="sensitive_data_retention_days", + field=models.PositiveIntegerField(default=30), + ), + migrations.AddField( + model_name="voicetasksession", + name="sensitive_data_redacted_at", + field=models.DateTimeField(blank=True, null=True), + ), + ] diff --git a/plane-src/apps/api/plane/db/models/voice_tasker.py b/plane-src/apps/api/plane/db/models/voice_tasker.py index 0d6e3be..ade4844 100644 --- a/plane-src/apps/api/plane/db/models/voice_tasker.py +++ b/plane-src/apps/api/plane/db/models/voice_tasker.py @@ -56,6 +56,7 @@ class WorkspaceAISettings(BaseModel): workspace_daily_limit = models.PositiveIntegerField(default=1000) project_daily_limit = models.PositiveIntegerField(default=300) workspace_concurrency_limit = models.PositiveIntegerField(default=3) + sensitive_data_retention_days = models.PositiveIntegerField(default=30) class Meta: verbose_name = "Workspace AI Settings" @@ -139,6 +140,7 @@ class VoiceTaskSession(BaseModel): parser_prompt_tokens = models.PositiveIntegerField(null=True, blank=True) parser_completion_tokens = models.PositiveIntegerField(null=True, blank=True) parser_total_tokens = models.PositiveIntegerField(null=True, blank=True) + sensitive_data_redacted_at = models.DateTimeField(null=True, blank=True) created_task = models.ForeignKey( "db.Issue", on_delete=models.SET_NULL, diff --git a/plane-src/apps/api/plane/settings/storage.py b/plane-src/apps/api/plane/settings/storage.py index 3d67902..562600f 100644 --- a/plane-src/apps/api/plane/settings/storage.py +++ b/plane-src/apps/api/plane/settings/storage.py @@ -23,6 +23,8 @@ class S3Storage(S3Boto3Storage): """S3 storage class to generate presigned URLs for S3 objects""" def __init__(self, request=None, is_server=False, use_internal_endpoint=False, **kwargs): + super().__init__(**kwargs) + # Get the AWS credentials and bucket name from the environment self.aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") # Use the AWS_SECRET_ACCESS_KEY environment variable for the secret key diff --git a/plane-src/apps/web/core/components/workspace/settings/ai-voice-tasker-settings.tsx b/plane-src/apps/web/core/components/workspace/settings/ai-voice-tasker-settings.tsx index abd13fc..d53b5e0 100644 --- a/plane-src/apps/web/core/components/workspace/settings/ai-voice-tasker-settings.tsx +++ b/plane-src/apps/web/core/components/workspace/settings/ai-voice-tasker-settings.tsx @@ -52,6 +52,7 @@ type TFormState = { workspace_daily_limit: number; project_daily_limit: number; workspace_concurrency_limit: number; + sensitive_data_retention_days: number; openai_api_key: string; }; @@ -75,6 +76,7 @@ const getInitialFormState = (settings?: TWorkspaceAISettings): TFormState => ({ workspace_daily_limit: settings?.workspace_daily_limit ?? 1000, project_daily_limit: settings?.project_daily_limit ?? 300, workspace_concurrency_limit: settings?.workspace_concurrency_limit ?? 3, + sensitive_data_retention_days: settings?.sensitive_data_retention_days ?? 30, openai_api_key: "", }); @@ -116,6 +118,7 @@ export const AIVoiceTaskerSettingsContent = observer(function AIVoiceTaskerSetti const [isSaving, setIsSaving] = useState(false); const [isTesting, setIsTesting] = useState(false); const [isCleaningStaleSessions, setIsCleaningStaleSessions] = useState(false); + const [isRunningRetention, setIsRunningRetention] = useState(false); // store hooks const { currentWorkspace } = useWorkspace(); const { fetchProjects, projectMap } = useProject(); @@ -208,6 +211,7 @@ export const AIVoiceTaskerSettingsContent = observer(function AIVoiceTaskerSetti workspace_daily_limit: formState.workspace_daily_limit, project_daily_limit: formState.project_daily_limit, workspace_concurrency_limit: formState.workspace_concurrency_limit, + sensitive_data_retention_days: formState.sensitive_data_retention_days, }; if (formState.openai_api_key.trim()) payload.openai_api_key = formState.openai_api_key.trim(); @@ -272,6 +276,25 @@ export const AIVoiceTaskerSettingsContent = observer(function AIVoiceTaskerSetti } }; + const handleRunRetention = async () => { + setIsRunningRetention(true); + try { + const response = await workspaceAIService.runVoiceTaskRetention(workspaceSlug); + await mutateMonitor(response, false); + setToast({ + type: TOAST_TYPE.SUCCESS, + title: `Retention выполнен: очищено ${response.redacted_count ?? 0}`, + }); + } catch { + setToast({ + type: TOAST_TYPE.ERROR, + title: "Не удалось запустить retention Voice Tasker", + }); + } finally { + setIsRunningRetention(false); + } + }; + if (workspaceUserInfo && !canPerformWorkspaceAdminActions) { return ; } @@ -453,14 +476,25 @@ export const AIVoiceTaskerSettingsContent = observer(function AIVoiceTaskerSetti onChange={(value) => updateFormValue("workspace_concurrency_limit", value)} /> + + updateFormValue("sensitive_data_retention_days", value)} + /> +
@@ -650,15 +684,19 @@ function getWorkspaceRoleLabel(role: IWorkspaceMember["role"]) { type TVoiceTaskMonitorSectionProps = { isCleaning: boolean; isLoading: boolean; + isRunningRetention: boolean; monitor?: TVoiceTaskMonitor; onCleanupStale: () => void; + onRunRetention: () => void; }; function VoiceTaskMonitorSection({ isCleaning, isLoading, + isRunningRetention, monitor, onCleanupStale, + onRunRetention, }: TVoiceTaskMonitorSectionProps) { const staleCount = monitor?.summary.stale ?? 0; const recentSessions = monitor?.recent_sessions ?? []; @@ -671,17 +709,30 @@ function VoiceTaskMonitorSection({ title="Очередь и диагностика" description="Мониторинг обработок Voice Tasker за последние 24 часа: активные job, ошибки, latency и расход parser tokens." right={ - +
+ + +
} /> @@ -727,6 +778,12 @@ function VoiceTaskMonitorSection({ value={formatMonitorNumber(monitor.summary.parser_total_tokens)} meta="OpenAI usage proxy" /> + {formatDateTime(session.timings.completed_at || session.timings.failed_at || session.timings.created_at)} + {session.usage.sensitive_data_redacted_at && ( + + очищено + + )}
{session.error.code &&
{session.error.code}
} diff --git a/plane-src/apps/web/core/services/workspace-ai.service.ts b/plane-src/apps/web/core/services/workspace-ai.service.ts index 9dd03eb..771d7b3 100644 --- a/plane-src/apps/web/core/services/workspace-ai.service.ts +++ b/plane-src/apps/web/core/services/workspace-ai.service.ts @@ -62,6 +62,14 @@ export class WorkspaceAIService extends APIService { }); } + async runVoiceTaskRetention(workspaceSlug: string): Promise { + return this.post(`/api/workspaces/${workspaceSlug}/voice-tasker/monitor/`, { action: "run_retention" }) + .then((response) => response?.data) + .catch((error) => { + throw error?.response?.data; + }); + } + async retrieveVoiceTaskPreflight(workspaceSlug: string, projectId?: string | null): Promise { const params = projectId ? `?project_id=${projectId}` : ""; return this.get( diff --git a/plane-src/packages/types/src/ai.ts b/plane-src/packages/types/src/ai.ts index 36bf066..b066436 100644 --- a/plane-src/packages/types/src/ai.ts +++ b/plane-src/packages/types/src/ai.ts @@ -47,6 +47,7 @@ export type TWorkspaceAISettings = { workspace_daily_limit: number; project_daily_limit: number; workspace_concurrency_limit: number; + sensitive_data_retention_days: number; credential: TWorkspaceAICredentialStatus; created_at: string; updated_at: string; @@ -69,6 +70,7 @@ export type TWorkspaceAISettingsPayload = Partial< | "workspace_daily_limit" | "project_daily_limit" | "workspace_concurrency_limit" + | "sensitive_data_retention_days" > > & { openai_api_key?: string; @@ -257,6 +259,7 @@ export type TVoiceTaskMonitorSession = { parser_prompt_tokens: number | null; parser_completion_tokens: number | null; parser_total_tokens: number | null; + sensitive_data_redacted_at: string | null; }; error: { code: string; @@ -270,6 +273,8 @@ export type TVoiceTaskMonitor = { window_seconds: number; stale_after_seconds: number; cleaned_count?: number; + cleaned_stale_count?: number; + redacted_count?: number; concurrency: { scope: "workspace"; limit: number; @@ -289,6 +294,7 @@ export type TVoiceTaskMonitor = { total_audio_seconds: number; total_audio_size: number; parser_total_tokens: number; + redacted: number; status_counts: Record; error_counts: { error_code: string;