ФУНКЦИИ - МЕЖПРОЕКТНАЯ КОММУНИКАЦИЯ: blob-дедупликация файлов

This commit is contained in:
DCCONSTRUCTIONS 2026-04-27 09:45:16 +03:00
parent b945bc4d31
commit 490aa1bc04
18 changed files with 827 additions and 114 deletions

View File

@ -116,6 +116,7 @@ class FileAssetSerializer(BaseSerializer):
"page",
"draft_issue",
"user",
"blob",
"is_deleted",
"deleted_at",
"storage_metadata",

View File

@ -678,6 +678,7 @@ class IssueAttachmentSerializer(BaseSerializer):
"workspace",
"project",
"issue",
"blob",
"updated_by",
"updated_at",
]

View File

@ -15,7 +15,6 @@ from rest_framework.response import Response
from drf_spectacular.utils import OpenApiExample, OpenApiRequest
# Module Imports
from plane.bgtasks.storage_metadata_task import get_asset_object_metadata
from plane.settings.storage import S3Storage
from plane.db.models import FileAsset, User, Workspace
from plane.api.views.base import BaseAPIView
@ -44,6 +43,11 @@ from plane.utils.openapi import (
)
from plane.utils.exception_logger import log_exception
from plane.utils.upload_limits import resolve_workspace_upload_size_limit
from plane.utils.file_dedup import (
UploadedObjectMissing,
confirm_uploaded_file_asset,
release_file_asset_blob,
)
class UserAssetEndpoint(BaseAPIView):
@ -53,9 +57,11 @@ class UserAssetEndpoint(BaseAPIView):
asset = FileAsset.objects.filter(id=asset_id).first()
if asset is None:
return
if not asset.is_uploaded:
release_file_asset_blob(asset, delete_untracked_object=True)
asset.is_deleted = True
asset.deleted_at = timezone.now()
asset.save(update_fields=["is_deleted", "deleted_at"])
asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"])
return
def entity_asset_delete(self, entity_type, asset, request):
@ -209,15 +215,17 @@ class UserAssetEndpoint(BaseAPIView):
"""
# get the asset id
asset = FileAsset.objects.get(id=asset_id, user_id=request.user.id)
# get the storage metadata
asset.is_uploaded = True
# get the storage metadata
if not asset.storage_metadata:
get_asset_object_metadata.delay(asset_id=str(asset_id))
# update the attributes
asset.attributes = request.data.get("attributes", asset.attributes)
# save the asset
asset.save(update_fields=["is_uploaded", "attributes"])
try:
confirm_uploaded_file_asset(
asset,
request=request,
attributes=request.data.get("attributes", asset.attributes),
)
except UploadedObjectMissing:
return Response(
{"error": "The uploaded asset object was not found.", "status": False},
status=status.HTTP_400_BAD_REQUEST,
)
return Response(status=status.HTTP_204_NO_CONTENT)
@asset_docs(
@ -236,11 +244,13 @@ class UserAssetEndpoint(BaseAPIView):
This performs a soft delete by marking the asset as deleted and updating the user's profile.
"""
asset = FileAsset.objects.get(id=asset_id, user_id=request.user.id)
if not asset.is_uploaded:
release_file_asset_blob(asset, request=request, delete_untracked_object=True)
asset.is_deleted = True
asset.deleted_at = timezone.now()
# get the entity and save the asset id for the request field
self.entity_asset_delete(entity_type=asset.entity_type, asset=asset, request=request)
asset.save(update_fields=["is_deleted", "deleted_at"])
asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"])
return Response(status=status.HTTP_204_NO_CONTENT)
@ -251,9 +261,11 @@ class UserServerAssetEndpoint(BaseAPIView):
asset = FileAsset.objects.filter(id=asset_id).first()
if asset is None:
return
if not asset.is_uploaded:
release_file_asset_blob(asset, delete_untracked_object=True)
asset.is_deleted = True
asset.deleted_at = timezone.now()
asset.save(update_fields=["is_deleted", "deleted_at"])
asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"])
return
def entity_asset_delete(self, entity_type, asset, request):
@ -365,15 +377,17 @@ class UserServerAssetEndpoint(BaseAPIView):
"""
# get the asset id
asset = FileAsset.objects.get(id=asset_id, user_id=request.user.id)
# get the storage metadata
asset.is_uploaded = True
# get the storage metadata
if not asset.storage_metadata:
get_asset_object_metadata.delay(asset_id=str(asset_id))
# update the attributes
asset.attributes = request.data.get("attributes", asset.attributes)
# save the asset
asset.save(update_fields=["is_uploaded", "attributes"])
try:
confirm_uploaded_file_asset(
asset,
request=request,
attributes=request.data.get("attributes", asset.attributes),
)
except UploadedObjectMissing:
return Response(
{"error": "The uploaded asset object was not found.", "status": False},
status=status.HTTP_400_BAD_REQUEST,
)
return Response(status=status.HTTP_204_NO_CONTENT)
@asset_docs(
@ -393,11 +407,13 @@ class UserServerAssetEndpoint(BaseAPIView):
asset as deleted and updating the user's profile.
"""
asset = FileAsset.objects.get(id=asset_id, user_id=request.user.id)
if not asset.is_uploaded:
release_file_asset_blob(asset, request=request, delete_untracked_object=True)
asset.is_deleted = True
asset.deleted_at = timezone.now()
# get the entity and save the asset id for the request field
self.entity_asset_delete(entity_type=asset.entity_type, asset=asset, request=request)
asset.save(update_fields=["is_deleted", "deleted_at"])
asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"])
return Response(status=status.HTTP_204_NO_CONTENT)
@ -604,15 +620,21 @@ class GenericAssetEndpoint(BaseAPIView):
try:
asset = FileAsset.objects.get(id=asset_id, workspace__slug=slug, is_deleted=False)
# Update is_uploaded status
asset.is_uploaded = request.data.get("is_uploaded", asset.is_uploaded)
# Update storage metadata if not present
if not asset.storage_metadata:
get_asset_object_metadata.delay(asset_id=str(asset_id))
asset.save(update_fields=["is_uploaded"])
if request.data.get("is_uploaded", asset.is_uploaded):
confirm_uploaded_file_asset(
asset,
request=request,
attributes=request.data.get("attributes"),
)
else:
asset.is_uploaded = False
asset.save(update_fields=["is_uploaded", "updated_at"])
return Response(status=status.HTTP_204_NO_CONTENT)
except FileAsset.DoesNotExist:
return Response({"error": "Asset not found"}, status=status.HTTP_404_NOT_FOUND)
except UploadedObjectMissing:
return Response(
{"error": "The uploaded asset object was not found.", "status": False},
status=status.HTTP_400_BAD_REQUEST,
)

View File

@ -82,11 +82,11 @@ from plane.db.models import (
Workspace,
)
from plane.settings.storage import S3Storage
from plane.bgtasks.storage_metadata_task import get_asset_object_metadata
from .base import BaseAPIView
from plane.utils.host import base_host
from plane.utils.issue_relation_mapper import get_actual_relation
from plane.utils.attachment_preview import attachment_object_exists, get_attachment_preview_response
from plane.utils.file_dedup import finalize_uploaded_file_asset, release_file_asset_blob, UploadedObjectMissing
from plane.utils.upload_limits import resolve_workspace_upload_size_limit
from plane.bgtasks.webhook_task import model_activity
from plane.app.permissions import ROLE
@ -2020,9 +2020,11 @@ class IssueAttachmentDetailAPIEndpoint(BaseAPIView):
)
issue_attachment = FileAsset.objects.get(pk=pk, workspace__slug=slug, project_id=project_id)
if not issue_attachment.is_uploaded:
release_file_asset_blob(issue_attachment, request=request, delete_untracked_object=True)
issue_attachment.is_deleted = True
issue_attachment.deleted_at = timezone.now()
issue_attachment.save()
issue_attachment.save(update_fields=["is_deleted", "deleted_at", "updated_at"])
issue_activity.delay(
type="attachment.activity.deleted",
@ -2036,10 +2038,6 @@ class IssueAttachmentDetailAPIEndpoint(BaseAPIView):
origin=base_host(request=request, is_app=True),
)
# Get the storage metadata
if not issue_attachment.storage_metadata:
get_asset_object_metadata.delay(str(issue_attachment.id))
issue_attachment.save()
return Response(status=status.HTTP_204_NO_CONTENT)
@issue_attachment_docs(
@ -2174,14 +2172,16 @@ class IssueAttachmentDetailAPIEndpoint(BaseAPIView):
origin=base_host(request=request, is_app=True),
)
# Update the attachment
issue_attachment.is_uploaded = True
issue_attachment.created_by = request.user
issue_attachment.save(update_fields=["created_by", "updated_at"])
# Get the storage metadata
if not issue_attachment.storage_metadata:
get_asset_object_metadata.delay(str(issue_attachment.id))
issue_attachment.save()
try:
finalize_uploaded_file_asset(issue_attachment, request=request)
except UploadedObjectMissing:
return Response(
{"error": "The uploaded attachment object was not found.", "status": False},
status=status.HTTP_400_BAD_REQUEST,
)
return Response(status=status.HTTP_204_NO_CONTENT)

View File

@ -10,4 +10,4 @@ class FileAssetSerializer(BaseSerializer):
class Meta:
model = FileAsset
fields = "__all__"
read_only_fields = ["created_by", "updated_by", "created_at", "updated_at"]
read_only_fields = ["created_by", "updated_by", "created_at", "updated_at", "blob"]

View File

@ -630,6 +630,7 @@ class IssueAttachmentSerializer(BaseSerializer):
"workspace",
"project",
"issue",
"blob",
]

View File

@ -11,6 +11,7 @@ from rest_framework.parsers import MultiPartParser, FormParser, JSONParser
from ..base import BaseAPIView, BaseViewSet
from plane.db.models import FileAsset, Workspace
from plane.app.serializers import FileAssetSerializer
from plane.utils.file_dedup import UploadedObjectMissing, confirm_uploaded_file_asset, release_file_asset_blob
class FileAssetEndpoint(BaseAPIView):
@ -37,15 +38,24 @@ class FileAssetEndpoint(BaseAPIView):
if serializer.is_valid():
# Get the workspace
workspace = Workspace.objects.get(slug=slug)
serializer.save(workspace_id=workspace.id)
asset = serializer.save(workspace_id=workspace.id)
try:
confirm_uploaded_file_asset(asset, request=request)
except UploadedObjectMissing:
return Response(
{"error": "The uploaded asset object was not found.", "status": False},
status=status.HTTP_400_BAD_REQUEST,
)
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def delete(self, request, workspace_id, asset_key):
asset_key = str(workspace_id) + "/" + asset_key
file_asset = FileAsset.objects.get(asset=asset_key)
if not file_asset.is_uploaded:
release_file_asset_blob(file_asset, request=request, delete_untracked_object=True)
file_asset.is_deleted = True
file_asset.save(update_fields=["is_deleted"])
file_asset.save(update_fields=["is_deleted", "updated_at"])
return Response(status=status.HTTP_204_NO_CONTENT)
@ -75,12 +85,21 @@ class UserAssetsEndpoint(BaseAPIView):
def post(self, request):
serializer = FileAssetSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
asset = serializer.save()
try:
confirm_uploaded_file_asset(asset, request=request)
except UploadedObjectMissing:
return Response(
{"error": "The uploaded asset object was not found.", "status": False},
status=status.HTTP_400_BAD_REQUEST,
)
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def delete(self, request, asset_key):
file_asset = FileAsset.objects.get(asset=asset_key, created_by=request.user)
if not file_asset.is_uploaded:
release_file_asset_blob(file_asset, request=request, delete_untracked_object=True)
file_asset.is_deleted = True
file_asset.save(update_fields=["is_deleted"])
file_asset.save(update_fields=["is_deleted", "updated_at"])
return Response(status=status.HTTP_204_NO_CONTENT)

View File

@ -22,9 +22,14 @@ from plane.db.models import FileAsset, Workspace, Project, User
from plane.settings.storage import S3Storage
from plane.app.permissions import allow_permission, ROLE
from plane.utils.cache import invalidate_cache_directly
from plane.bgtasks.storage_metadata_task import get_asset_object_metadata
from plane.throttles.asset import AssetRateThrottle
from plane.utils.upload_limits import resolve_workspace_upload_size_limit
from plane.utils.file_dedup import (
UploadedObjectMissing,
attach_existing_blob_to_file_asset,
confirm_uploaded_file_asset,
release_file_asset_blob,
)
class UserAssetsV2Endpoint(BaseAPIView):
@ -34,9 +39,11 @@ class UserAssetsV2Endpoint(BaseAPIView):
asset = FileAsset.objects.filter(id=asset_id).first()
if asset is None:
return
if not asset.is_uploaded:
release_file_asset_blob(asset, delete_untracked_object=True)
asset.is_deleted = True
asset.deleted_at = timezone.now()
asset.save(update_fields=["is_deleted", "deleted_at"])
asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"])
return
def entity_asset_save(self, asset_id, entity_type, asset, request):
@ -171,11 +178,17 @@ class UserAssetsV2Endpoint(BaseAPIView):
def patch(self, request, asset_id):
# get the asset id
asset = FileAsset.objects.get(id=asset_id, user_id=request.user.id)
# get the storage metadata
asset.is_uploaded = True
# get the storage metadata
if not asset.storage_metadata:
get_asset_object_metadata.delay(asset_id=str(asset_id))
try:
asset = confirm_uploaded_file_asset(
asset,
request=request,
attributes=request.data.get("attributes", asset.attributes),
)
except UploadedObjectMissing:
return Response(
{"error": "The uploaded asset object was not found.", "status": False},
status=status.HTTP_400_BAD_REQUEST,
)
# get the entity and save the asset id for the request field
self.entity_asset_save(
asset_id=asset_id,
@ -183,19 +196,17 @@ class UserAssetsV2Endpoint(BaseAPIView):
asset=asset,
request=request,
)
# update the attributes
asset.attributes = request.data.get("attributes", asset.attributes)
# save the asset
asset.save(update_fields=["is_uploaded", "attributes"])
return Response(status=status.HTTP_204_NO_CONTENT)
def delete(self, request, asset_id):
asset = FileAsset.objects.get(id=asset_id, user_id=request.user.id)
if not asset.is_uploaded:
release_file_asset_blob(asset, request=request, delete_untracked_object=True)
asset.is_deleted = True
asset.deleted_at = timezone.now()
# get the entity and save the asset id for the request field
self.entity_asset_delete(entity_type=asset.entity_type, asset=asset, request=request)
asset.save(update_fields=["is_deleted", "deleted_at"])
asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"])
return Response(status=status.HTTP_204_NO_CONTENT)
@ -239,10 +250,12 @@ class WorkspaceFileAssetEndpoint(BaseAPIView):
# Check if the asset exists
if asset is None:
return
if not asset.is_uploaded:
release_file_asset_blob(asset, delete_untracked_object=True)
# Mark the asset as deleted
asset.is_deleted = True
asset.deleted_at = timezone.now()
asset.save(update_fields=["is_deleted", "deleted_at"])
asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"])
return
def entity_asset_save(self, asset_id, entity_type, asset, request):
@ -380,11 +393,17 @@ class WorkspaceFileAssetEndpoint(BaseAPIView):
def patch(self, request, slug, asset_id):
# get the asset id
asset = FileAsset.objects.get(id=asset_id, workspace__slug=slug)
# get the storage metadata
asset.is_uploaded = True
# get the storage metadata
if not asset.storage_metadata:
get_asset_object_metadata.delay(asset_id=str(asset_id))
try:
asset = confirm_uploaded_file_asset(
asset,
request=request,
attributes=request.data.get("attributes", asset.attributes),
)
except UploadedObjectMissing:
return Response(
{"error": "The uploaded asset object was not found.", "status": False},
status=status.HTTP_400_BAD_REQUEST,
)
# get the entity and save the asset id for the request field
self.entity_asset_save(
asset_id=asset_id,
@ -392,19 +411,17 @@ class WorkspaceFileAssetEndpoint(BaseAPIView):
asset=asset,
request=request,
)
# update the attributes
asset.attributes = request.data.get("attributes", asset.attributes)
# save the asset
asset.save(update_fields=["is_uploaded", "attributes"])
return Response(status=status.HTTP_204_NO_CONTENT)
def delete(self, request, slug, asset_id):
asset = FileAsset.objects.get(id=asset_id, workspace__slug=slug)
if not asset.is_uploaded:
release_file_asset_blob(asset, request=request, delete_untracked_object=True)
asset.is_deleted = True
asset.deleted_at = timezone.now()
# get the entity and save the asset id for the request field
self.entity_asset_delete(entity_type=asset.entity_type, asset=asset, request=request)
asset.save(update_fields=["is_deleted", "deleted_at"])
asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"])
return Response(status=status.HTTP_204_NO_CONTENT)
def get(self, request, slug, asset_id):
@ -581,27 +598,30 @@ class ProjectAssetEndpoint(BaseAPIView):
def patch(self, request, slug, project_id, pk):
# get the asset id
asset = FileAsset.objects.get(id=pk, workspace__slug=slug, project_id=project_id)
# get the storage metadata
asset.is_uploaded = True
# get the storage metadata
if not asset.storage_metadata:
get_asset_object_metadata.delay(asset_id=str(pk))
# update the attributes
asset.attributes = request.data.get("attributes", asset.attributes)
# save the asset
asset.save(update_fields=["is_uploaded", "attributes"])
try:
confirm_uploaded_file_asset(
asset,
request=request,
attributes=request.data.get("attributes", asset.attributes),
)
except UploadedObjectMissing:
return Response(
{"error": "The uploaded asset object was not found.", "status": False},
status=status.HTTP_400_BAD_REQUEST,
)
return Response(status=status.HTTP_204_NO_CONTENT)
@allow_permission([ROLE.ADMIN, ROLE.MEMBER, ROLE.GUEST])
def delete(self, request, slug, project_id, pk):
# Get the asset
asset = FileAsset.objects.get(id=pk, workspace__slug=slug, project_id=project_id)
if not asset.is_uploaded:
release_file_asset_blob(asset, request=request, delete_untracked_object=True)
# Check deleted assets
asset.is_deleted = True
asset.deleted_at = timezone.now()
# Save the asset
asset.save(update_fields=["is_deleted", "deleted_at"])
asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"])
return Response(status=status.HTTP_204_NO_CONTENT)
@allow_permission([ROLE.ADMIN, ROLE.MEMBER, ROLE.GUEST])
@ -753,7 +773,7 @@ class DuplicateAssetEndpoint(BaseAPIView):
return Response({"error": "Project not found"}, status=status.HTTP_404_NOT_FOUND)
storage = S3Storage(request=request)
original_asset = FileAsset.objects.filter(id=asset_id, is_uploaded=True).first()
original_asset = FileAsset.objects.filter(id=asset_id, workspace=workspace, is_uploaded=True).first()
if not original_asset:
return Response({"error": "Asset not found"}, status=status.HTTP_404_NOT_FOUND)
@ -774,9 +794,15 @@ class DuplicateAssetEndpoint(BaseAPIView):
storage_metadata=original_asset.storage_metadata,
**self.get_entity_id_field(entity_type=entity_type, entity_id=entity_id),
)
storage.copy_object(original_asset.asset, destination_key)
# Update the is_uploaded field for all newly created assets
FileAsset.objects.filter(id=duplicated_asset.id).update(is_uploaded=True)
if not attach_existing_blob_to_file_asset(original_asset, duplicated_asset):
storage.copy_object(original_asset.asset, destination_key)
try:
confirm_uploaded_file_asset(duplicated_asset, request=request)
except UploadedObjectMissing:
return Response(
{"error": "The source asset object was not found.", "status": False},
status=status.HTTP_400_BAD_REQUEST,
)
return Response({"asset_id": str(duplicated_asset.id)}, status=status.HTTP_200_OK)

View File

@ -23,10 +23,10 @@ from plane.db.models import FileAsset, Workspace
from plane.bgtasks.issue_activities_task import issue_activity
from plane.app.permissions import allow_permission, ROLE
from plane.settings.storage import S3Storage
from plane.bgtasks.storage_metadata_task import get_asset_object_metadata
from plane.utils.host import base_host
from plane.utils.upload_limits import resolve_workspace_upload_size_limit
from plane.utils.attachment_preview import attachment_object_exists, get_attachment_preview_response
from plane.utils.file_dedup import finalize_uploaded_file_asset, release_file_asset_blob, UploadedObjectMissing
class IssueAttachmentEndpoint(BaseAPIView):
@ -69,7 +69,9 @@ class IssueAttachmentEndpoint(BaseAPIView):
{"error": "Issue attachment not found."},
status=status.HTTP_404_NOT_FOUND,
)
issue_attachment.asset.delete(save=False)
release_result = release_file_asset_blob(issue_attachment, request=request)
if not release_result.had_blob:
issue_attachment.asset.delete(save=False)
issue_attachment.delete()
issue_activity.delay(
type="attachment.activity.deleted",
@ -155,9 +157,11 @@ class IssueAttachmentV2Endpoint(BaseAPIView):
@allow_permission([ROLE.ADMIN], creator=True, model=FileAsset)
def delete(self, request, slug, project_id, issue_id, pk):
issue_attachment = FileAsset.objects.get(pk=pk, workspace__slug=slug, project_id=project_id)
if not issue_attachment.is_uploaded:
release_file_asset_blob(issue_attachment, request=request, delete_untracked_object=True)
issue_attachment.is_deleted = True
issue_attachment.deleted_at = timezone.now()
issue_attachment.save()
issue_attachment.save(update_fields=["is_deleted", "deleted_at", "updated_at"])
issue_activity.delay(
type="attachment.activity.deleted",
@ -225,12 +229,14 @@ class IssueAttachmentV2Endpoint(BaseAPIView):
origin=base_host(request=request, is_app=True),
)
# Update the attachment
issue_attachment.is_uploaded = True
issue_attachment.created_by = request.user
issue_attachment.save(update_fields=["created_by", "updated_at"])
# Get the storage metadata
if not issue_attachment.storage_metadata:
get_asset_object_metadata.delay(str(issue_attachment.id))
issue_attachment.save()
try:
finalize_uploaded_file_asset(issue_attachment, request=request)
except UploadedObjectMissing:
return Response(
{"error": "The uploaded attachment object was not found.", "status": False},
status=status.HTTP_400_BAD_REQUEST,
)
return Response(status=status.HTTP_204_NO_CONTENT)

View File

@ -17,6 +17,7 @@ from plane.utils.exception_logger import log_exception
from plane.settings.storage import S3Storage
from celery import shared_task
from plane.utils.url import normalize_url_path
from plane.utils.file_dedup import attach_existing_blob_to_file_asset, confirm_uploaded_file_asset
def get_entity_id_field(entity_type, entity_id):
@ -108,15 +109,16 @@ def copy_assets(entity, entity_identifier, project_id, asset_ids, user_id):
storage_metadata=original_asset.storage_metadata,
**get_entity_id_field(original_asset.entity_type, entity_identifier),
)
storage.copy_object(original_asset.asset, destination_key)
if not attach_existing_blob_to_file_asset(original_asset, duplicated_asset):
storage.copy_object(original_asset.asset, destination_key)
confirm_uploaded_file_asset(duplicated_asset)
duplicated_assets.append(
{
"new_asset_id": str(duplicated_asset.id),
"old_asset_id": str(original_asset.id),
}
)
if duplicated_assets:
FileAsset.objects.filter(pk__in=[item["new_asset_id"] for item in duplicated_assets]).update(is_uploaded=True)
return duplicated_assets

View File

@ -15,12 +15,16 @@ from celery import shared_task
# Module imports
from plane.db.models import FileAsset
from plane.utils.file_dedup import release_file_asset_blob
@shared_task
def delete_unuploaded_file_asset():
"""This task deletes unuploaded file assets older than a certain number of days."""
FileAsset.objects.filter(
stale_assets = FileAsset.objects.filter(
Q(created_at__lt=timezone.now() - timedelta(days=int(os.environ.get("UNUPLOADED_ASSET_DELETE_DAYS", "7"))))
& Q(is_uploaded=False)
).delete()
)
for asset in stale_assets.iterator():
release_file_asset_blob(asset, delete_untracked_object=True)
asset.delete()

View File

@ -0,0 +1,112 @@
# Generated by Codex on 2026-04-26
import uuid
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
("db", "0127_issue_detail_layout"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="StoredBlob",
fields=[
("created_at", models.DateTimeField(auto_now_add=True, verbose_name="Created At")),
("updated_at", models.DateTimeField(auto_now=True, verbose_name="Last Modified At")),
("deleted_at", models.DateTimeField(blank=True, null=True, verbose_name="Deleted At")),
(
"id",
models.UUIDField(
db_index=True,
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
unique=True,
),
),
("sha256", models.CharField(db_index=True, max_length=64)),
("size", models.PositiveBigIntegerField(default=0)),
("mime_type", models.CharField(blank=True, default="", max_length=255)),
("canonical_object_key", models.CharField(max_length=800)),
(
"status",
models.CharField(
choices=[("active", "Active"), ("orphaned", "Orphaned"), ("missing", "Missing")],
default="active",
max_length=32,
),
),
("ref_count", models.PositiveIntegerField(default=0)),
("storage_metadata", models.JSONField(blank=True, default=dict, null=True)),
(
"created_by",
models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="%(class)s_created_by",
to=settings.AUTH_USER_MODEL,
verbose_name="Created By",
),
),
(
"updated_by",
models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="%(class)s_updated_by",
to=settings.AUTH_USER_MODEL,
verbose_name="Last Modified By",
),
),
(
"workspace",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="stored_blobs",
to="db.workspace",
),
),
],
options={
"verbose_name": "Stored Blob",
"verbose_name_plural": "Stored Blobs",
"db_table": "stored_blobs",
"ordering": ("-created_at",),
},
),
migrations.AddField(
model_name="fileasset",
name="blob",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="assets",
to="db.storedblob",
),
),
migrations.AddIndex(
model_name="storedblob",
index=models.Index(fields=["workspace", "sha256"], name="stored_blob_workspace_sha_idx"),
),
migrations.AddIndex(
model_name="storedblob",
index=models.Index(fields=["status"], name="stored_blob_status_idx"),
),
migrations.AddConstraint(
model_name="storedblob",
constraint=models.UniqueConstraint(
condition=models.Q(("deleted_at__isnull", True), ("status", "active")),
fields=("workspace", "sha256", "size"),
name="stored_blob_unique_active_hash_size",
),
),
]

View File

@ -4,7 +4,7 @@
from .analytic import AnalyticView
from .api import APIActivityLog, APIToken
from .asset import FileAsset
from .asset import FileAsset, StoredBlob
from .base import BaseModel
from .cycle import Cycle, CycleIssue, CycleUserProperties
from .deploy_board import DeployBoard

View File

@ -60,6 +60,13 @@ class FileAsset(BaseModel):
size = models.FloatField(default=0)
is_uploaded = models.BooleanField(default=False)
storage_metadata = models.JSONField(default=dict, null=True, blank=True)
blob = models.ForeignKey(
"db.StoredBlob",
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="assets",
)
class Meta:
verbose_name = "File Asset"
@ -98,3 +105,43 @@ class FileAsset(BaseModel):
return f"/api/assets/v2/workspaces/{self.workspace.slug}/projects/{self.project_id}/{self.id}/"
return None
class StoredBlob(BaseModel):
"""
Canonical stored object shared by one or more FileAsset records.
"""
class Status(models.TextChoices):
ACTIVE = "active", "Active"
ORPHANED = "orphaned", "Orphaned"
MISSING = "missing", "Missing"
workspace = models.ForeignKey("db.Workspace", on_delete=models.CASCADE, related_name="stored_blobs")
sha256 = models.CharField(max_length=64, db_index=True)
size = models.PositiveBigIntegerField(default=0)
mime_type = models.CharField(max_length=255, blank=True, default="")
canonical_object_key = models.CharField(max_length=800)
status = models.CharField(max_length=32, choices=Status.choices, default=Status.ACTIVE)
ref_count = models.PositiveIntegerField(default=0)
storage_metadata = models.JSONField(default=dict, null=True, blank=True)
class Meta:
verbose_name = "Stored Blob"
verbose_name_plural = "Stored Blobs"
db_table = "stored_blobs"
ordering = ("-created_at",)
indexes = [
models.Index(fields=["workspace", "sha256"], name="stored_blob_workspace_sha_idx"),
models.Index(fields=["status"], name="stored_blob_status_idx"),
]
constraints = [
models.UniqueConstraint(
fields=["workspace", "sha256", "size"],
condition=models.Q(deleted_at__isnull=True, status="active"),
name="stored_blob_unique_active_hash_size",
)
]
def __str__(self):
return self.canonical_object_key

View File

@ -22,7 +22,7 @@ class S3Storage(S3Boto3Storage):
"""S3 storage class to generate presigned URLs for S3 objects"""
def __init__(self, request=None):
def __init__(self, request=None, **kwargs):
# Get the AWS credentials and bucket name from the environment
self.aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID")
# Use the AWS_SECRET_ACCESS_KEY environment variable for the secret key

View File

@ -15,9 +15,9 @@ from rest_framework import status
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.response import Response
from plane.bgtasks.storage_metadata_task import get_asset_object_metadata
from plane.db.models import DeployBoard, FileAsset
from plane.settings.storage import S3Storage
from plane.utils.file_dedup import UploadedObjectMissing, confirm_uploaded_file_asset, release_file_asset_blob
# Module imports
from .base import BaseAPIView
@ -141,16 +141,17 @@ class EntityAssetEndpoint(BaseAPIView):
# get the asset id
asset = FileAsset.objects.get(id=pk, workspace=deploy_board.workspace)
# get the storage metadata
asset.is_uploaded = True
# get the storage metadata
if not asset.storage_metadata:
get_asset_object_metadata.delay(str(asset.id))
# update the attributes
asset.attributes = request.data.get("attributes", asset.attributes)
# save the asset
asset.save(update_fields=["attributes", "is_uploaded"])
try:
confirm_uploaded_file_asset(
asset,
request=request,
attributes=request.data.get("attributes", asset.attributes),
)
except UploadedObjectMissing:
return Response(
{"error": "The uploaded asset object was not found.", "status": False},
status=status.HTTP_400_BAD_REQUEST,
)
return Response(status=status.HTTP_204_NO_CONTENT)
def delete(self, request, anchor, pk):
@ -161,11 +162,13 @@ class EntityAssetEndpoint(BaseAPIView):
return Response({"error": "Project is not published"}, status=status.HTTP_404_NOT_FOUND)
# Get the asset
asset = FileAsset.objects.get(id=pk, workspace=deploy_board.workspace, project_id=deploy_board.project_id)
if not asset.is_uploaded:
release_file_asset_blob(asset, request=request, delete_untracked_object=True)
# Check deleted assets
asset.is_deleted = True
asset.deleted_at = timezone.now()
# Save the asset
asset.save(update_fields=["is_deleted", "deleted_at"])
asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"])
return Response(status=status.HTTP_204_NO_CONTENT)

View File

@ -0,0 +1,152 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
from unittest.mock import patch
import pytest
from plane.db.models import FileAsset, StoredBlob
from plane.utils.file_dedup import attach_existing_blob_to_file_asset, finalize_uploaded_file_asset, release_file_asset_blob
class FakeStreamingBody:
def __init__(self, payload):
self.payload = payload
self.closed = False
def iter_chunks(self, chunk_size):
for index in range(0, len(self.payload), chunk_size):
yield self.payload[index : index + chunk_size]
def close(self):
self.closed = True
class FakeS3Client:
def __init__(self, objects):
self.objects = objects
def get_object(self, Bucket, Key):
return {"Body": FakeStreamingBody(self.objects[Key]["body"])}
class FakeStorage:
def __init__(self, objects):
self.aws_storage_bucket_name = "uploads"
self.s3_client = FakeS3Client(objects)
self.deleted = []
self.objects = objects
def get_object_metadata(self, object_name):
item = self.objects.get(object_name)
if item is None:
return None
return {
"ContentType": item["type"],
"ContentLength": len(item["body"]),
"ETag": item.get("etag", "etag"),
"Metadata": {},
}
def delete_files(self, object_names):
self.deleted.extend(object_names)
for object_name in object_names:
self.objects.pop(object_name, None)
return True
@pytest.fixture
def project(workspace):
from plane.db.models import Project, ProjectMember
project = Project.objects.create(name="Dedup Project", identifier="DEDUP", workspace=workspace)
ProjectMember.objects.create(project=project, member=workspace.owner)
return project
@pytest.fixture
def fake_storage():
objects = {
"workspace/a.txt": {"body": b"same payload", "type": "text/plain"},
"workspace/b.txt": {"body": b"same payload", "type": "text/plain"},
}
return FakeStorage(objects)
def create_asset(workspace, project, object_key):
return FileAsset.objects.create(
workspace=workspace,
project=project,
asset=object_key,
size=12,
attributes={"name": object_key.rsplit("/", 1)[-1], "type": "text/plain", "size": 12},
entity_type=FileAsset.EntityTypeContext.ISSUE_ATTACHMENT,
)
@pytest.mark.django_db
def test_finalize_uploaded_file_asset_reuses_existing_blob(workspace, project, fake_storage):
first_asset = create_asset(workspace, project, "workspace/a.txt")
duplicate_asset = create_asset(workspace, project, "workspace/b.txt")
with patch("plane.utils.file_dedup.S3Storage", return_value=fake_storage):
first_result = finalize_uploaded_file_asset(first_asset)
duplicate_result = finalize_uploaded_file_asset(duplicate_asset)
first_asset.refresh_from_db()
duplicate_asset.refresh_from_db()
blob = StoredBlob.objects.get()
assert first_result.deduplicated is False
assert duplicate_result.deduplicated is True
assert duplicate_result.deleted_duplicate_object is True
assert first_asset.blob_id == blob.id
assert duplicate_asset.blob_id == blob.id
assert first_asset.asset.name == "workspace/a.txt"
assert duplicate_asset.asset.name == "workspace/a.txt"
assert blob.ref_count == 2
assert fake_storage.deleted == ["workspace/b.txt"]
@pytest.mark.django_db
def test_release_file_asset_blob_deletes_canonical_object_after_last_reference(workspace, project, fake_storage):
first_asset = create_asset(workspace, project, "workspace/a.txt")
duplicate_asset = create_asset(workspace, project, "workspace/b.txt")
with patch("plane.utils.file_dedup.S3Storage", return_value=fake_storage):
finalize_uploaded_file_asset(first_asset)
finalize_uploaded_file_asset(duplicate_asset)
first_release = release_file_asset_blob(first_asset)
second_release = release_file_asset_blob(duplicate_asset)
blob = StoredBlob.objects.get()
assert first_release.had_blob is True
assert first_release.deleted_object is False
assert second_release.had_blob is True
assert second_release.deleted_object is True
assert second_release.object_key == "workspace/a.txt"
assert blob.ref_count == 0
assert blob.status == StoredBlob.Status.ORPHANED
@pytest.mark.django_db
def test_attach_existing_blob_reuses_canonical_object_without_copy(workspace, project, fake_storage):
source_asset = create_asset(workspace, project, "workspace/a.txt")
target_asset = create_asset(workspace, project, "workspace/c.txt")
with patch("plane.utils.file_dedup.S3Storage", return_value=fake_storage):
finalize_uploaded_file_asset(source_asset)
attached = attach_existing_blob_to_file_asset(source_asset, target_asset)
source_asset.refresh_from_db()
target_asset.refresh_from_db()
blob = StoredBlob.objects.get()
assert attached is True
assert source_asset.blob_id == blob.id
assert target_asset.blob_id == blob.id
assert target_asset.asset.name == "workspace/a.txt"
assert blob.ref_count == 2
assert fake_storage.deleted == []

View File

@ -0,0 +1,317 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
import hashlib
from dataclasses import dataclass
from botocore.exceptions import ClientError
from django.db import IntegrityError, transaction
from plane.db.models import FileAsset, StoredBlob
from plane.settings.storage import S3Storage
from plane.utils.exception_logger import log_exception
class UploadedObjectMissing(Exception):
"""Raised when a FileAsset points to a storage object that is not available."""
@dataclass(frozen=True)
class FileBlobFinalizeResult:
asset: FileAsset
blob: StoredBlob
sha256: str
deduplicated: bool
deleted_duplicate_object: bool
@dataclass(frozen=True)
class FileBlobReleaseResult:
had_blob: bool
deleted_object: bool
object_key: str | None = None
def supports_blob_dedup(asset: FileAsset) -> bool:
return bool(asset.workspace_id)
def _object_key(asset: FileAsset) -> str:
return str(asset.asset.name or asset.asset)
def _get_object_metadata(storage: S3Storage, object_key: str) -> dict:
metadata = storage.get_object_metadata(object_name=object_key)
if not metadata:
raise UploadedObjectMissing(f"Storage object not found: {object_key}")
return metadata
def _calculate_object_sha256(storage: S3Storage, object_key: str) -> str:
try:
response = storage.s3_client.get_object(Bucket=storage.aws_storage_bucket_name, Key=object_key)
except ClientError as exc:
raise UploadedObjectMissing(f"Storage object not found: {object_key}") from exc
digest = hashlib.sha256()
body = response["Body"]
try:
for chunk in body.iter_chunks(chunk_size=1024 * 1024):
if chunk:
digest.update(chunk)
finally:
body.close()
return digest.hexdigest()
def _resolve_size(asset: FileAsset, metadata: dict) -> int:
metadata_size = metadata.get("ContentLength")
if metadata_size is not None:
return int(metadata_size)
attr_size = (asset.attributes or {}).get("size")
if attr_size is not None:
return int(float(attr_size))
return int(asset.size or 0)
def _resolve_mime_type(asset: FileAsset, metadata: dict) -> str:
return str((asset.attributes or {}).get("type") or metadata.get("ContentType") or "")
def finalize_uploaded_file_asset(asset: FileAsset, request=None) -> FileBlobFinalizeResult:
"""
Attach a confirmed upload to a canonical StoredBlob.
The uploaded object is treated as a temporary candidate until its SHA-256 is
known. If the same blob already exists in the workspace, the FileAsset is
repointed to the canonical object and the duplicate candidate object is
removed from storage.
"""
if not asset.workspace_id:
raise ValueError("FileAsset workspace is required for blob deduplication.")
storage = S3Storage(request=request)
candidate_key = _object_key(asset)
candidate_metadata = _get_object_metadata(storage, candidate_key)
candidate_sha256 = _calculate_object_sha256(storage, candidate_key)
candidate_size = _resolve_size(asset, candidate_metadata)
candidate_mime_type = _resolve_mime_type(asset, candidate_metadata)
duplicate_object_key = None
released_object_key = None
with transaction.atomic():
locked_asset = FileAsset.objects.select_for_update().get(pk=asset.pk)
previous_blob_id = locked_asset.blob_id
blob = (
StoredBlob.objects.select_for_update()
.filter(
workspace_id=locked_asset.workspace_id,
sha256=candidate_sha256,
size=candidate_size,
status=StoredBlob.Status.ACTIVE,
deleted_at__isnull=True,
)
.first()
)
if blob is None:
try:
with transaction.atomic():
blob = StoredBlob.objects.create(
workspace_id=locked_asset.workspace_id,
sha256=candidate_sha256,
size=candidate_size,
mime_type=candidate_mime_type,
canonical_object_key=candidate_key,
status=StoredBlob.Status.ACTIVE,
ref_count=0,
storage_metadata=candidate_metadata,
)
except IntegrityError:
blob = (
StoredBlob.objects.select_for_update()
.filter(
workspace_id=locked_asset.workspace_id,
sha256=candidate_sha256,
size=candidate_size,
status=StoredBlob.Status.ACTIVE,
deleted_at__isnull=True,
)
.get()
)
deduplicated = blob.canonical_object_key != candidate_key
if deduplicated:
duplicate_object_key = candidate_key
if previous_blob_id and previous_blob_id != blob.id:
previous_blob = StoredBlob.objects.select_for_update().filter(pk=previous_blob_id).first()
if previous_blob:
previous_blob.ref_count = max(previous_blob.ref_count - 1, 0)
if previous_blob.ref_count == 0 and previous_blob.status == StoredBlob.Status.ACTIVE:
previous_blob.status = StoredBlob.Status.ORPHANED
released_object_key = previous_blob.canonical_object_key
previous_blob.save(update_fields=["ref_count", "status", "updated_at"])
if previous_blob_id != blob.id:
blob.ref_count += 1
blob.save(update_fields=["ref_count", "updated_at"])
attributes = dict(locked_asset.attributes or {})
attributes["size"] = candidate_size
attributes["type"] = candidate_mime_type
attributes["sha256"] = candidate_sha256
attributes["blob_id"] = str(blob.id)
attributes["deduplicated"] = deduplicated
locked_asset.blob = blob
locked_asset.asset = blob.canonical_object_key
locked_asset.size = candidate_size
locked_asset.attributes = attributes
locked_asset.storage_metadata = blob.storage_metadata or candidate_metadata
locked_asset.is_uploaded = True
locked_asset.save(
update_fields=[
"blob",
"asset",
"size",
"attributes",
"storage_metadata",
"is_uploaded",
"updated_at",
]
)
deleted_duplicate_object = False
object_keys_to_delete = [key for key in {duplicate_object_key, released_object_key} if key]
if object_keys_to_delete:
deleted_duplicate_object = storage.delete_files(object_names=object_keys_to_delete)
locked_asset.refresh_from_db()
asset.blob_id = locked_asset.blob_id
asset.asset = locked_asset.asset
asset.size = locked_asset.size
asset.attributes = locked_asset.attributes
asset.storage_metadata = locked_asset.storage_metadata
asset.is_uploaded = locked_asset.is_uploaded
return FileBlobFinalizeResult(
asset=locked_asset,
blob=blob,
sha256=candidate_sha256,
deduplicated=deduplicated,
deleted_duplicate_object=deleted_duplicate_object,
)
def confirm_uploaded_file_asset(asset: FileAsset, request=None, attributes: dict | None = None) -> FileAsset:
if attributes is not None:
asset.attributes = attributes
asset.save(update_fields=["attributes", "updated_at"])
if supports_blob_dedup(asset):
return finalize_uploaded_file_asset(asset, request=request).asset
storage = S3Storage(request=request)
asset.storage_metadata = storage.get_object_metadata(object_name=_object_key(asset)) or asset.storage_metadata
asset.is_uploaded = True
asset.save(update_fields=["storage_metadata", "is_uploaded", "updated_at"])
return asset
def release_file_asset_blob(
asset: FileAsset,
request=None,
delete_untracked_object: bool = False,
) -> FileBlobReleaseResult:
"""
Release the blob reference held by a FileAsset.
The canonical object is deleted only when the last blob-backed FileAsset
releases it. Legacy non-blob assets are left intact unless the caller marks
them as temporary/untracked.
"""
storage = S3Storage(request=request)
delete_object_key = None
had_blob = False
with transaction.atomic():
locked_asset = FileAsset.objects.select_for_update().get(pk=asset.pk)
current_key = _object_key(locked_asset)
if locked_asset.blob_id:
had_blob = True
blob = StoredBlob.objects.select_for_update().get(pk=locked_asset.blob_id)
blob.ref_count = max(blob.ref_count - 1, 0)
if blob.ref_count == 0 and blob.status == StoredBlob.Status.ACTIVE:
blob.status = StoredBlob.Status.ORPHANED
delete_object_key = blob.canonical_object_key
blob.save(update_fields=["ref_count", "status", "updated_at"])
locked_asset.blob = None
locked_asset.save(update_fields=["blob", "updated_at"])
elif delete_untracked_object and current_key:
delete_object_key = current_key
if had_blob:
asset.blob_id = None
deleted_object = False
if delete_object_key:
try:
deleted_object = storage.delete_files(object_names=[delete_object_key])
except Exception as exc:
log_exception(exc)
deleted_object = False
return FileBlobReleaseResult(had_blob=had_blob, deleted_object=deleted_object, object_key=delete_object_key)
def attach_existing_blob_to_file_asset(source: FileAsset, target: FileAsset) -> bool:
if not source.blob_id:
source.refresh_from_db(fields=["blob"])
if not source.blob_id:
return False
with transaction.atomic():
source_blob = StoredBlob.objects.select_for_update().get(pk=source.blob_id)
locked_target = FileAsset.objects.select_for_update().get(pk=target.pk)
if locked_target.blob_id == source_blob.id:
return True
source_blob.ref_count += 1
source_blob.save(update_fields=["ref_count", "updated_at"])
attributes = dict(locked_target.attributes or {})
attributes["sha256"] = source_blob.sha256
attributes["blob_id"] = str(source_blob.id)
attributes["deduplicated"] = True
locked_target.blob = source_blob
locked_target.asset = source_blob.canonical_object_key
locked_target.size = source_blob.size
locked_target.attributes = attributes
locked_target.storage_metadata = source_blob.storage_metadata
locked_target.is_uploaded = True
locked_target.save(
update_fields=[
"blob",
"asset",
"size",
"attributes",
"storage_metadata",
"is_uploaded",
"updated_at",
]
)
target.blob_id = source.blob_id
target.asset = source_blob.canonical_object_key
return True