diff --git a/plane-src/apps/api/plane/api/serializers/asset.py b/plane-src/apps/api/plane/api/serializers/asset.py index 363b5eb..bf80c93 100644 --- a/plane-src/apps/api/plane/api/serializers/asset.py +++ b/plane-src/apps/api/plane/api/serializers/asset.py @@ -116,6 +116,7 @@ class FileAssetSerializer(BaseSerializer): "page", "draft_issue", "user", + "blob", "is_deleted", "deleted_at", "storage_metadata", diff --git a/plane-src/apps/api/plane/api/serializers/issue.py b/plane-src/apps/api/plane/api/serializers/issue.py index c0451bf..654124a 100644 --- a/plane-src/apps/api/plane/api/serializers/issue.py +++ b/plane-src/apps/api/plane/api/serializers/issue.py @@ -678,6 +678,7 @@ class IssueAttachmentSerializer(BaseSerializer): "workspace", "project", "issue", + "blob", "updated_by", "updated_at", ] diff --git a/plane-src/apps/api/plane/api/views/asset.py b/plane-src/apps/api/plane/api/views/asset.py index 318ff87..3a586fb 100644 --- a/plane-src/apps/api/plane/api/views/asset.py +++ b/plane-src/apps/api/plane/api/views/asset.py @@ -15,7 +15,6 @@ from rest_framework.response import Response from drf_spectacular.utils import OpenApiExample, OpenApiRequest # Module Imports -from plane.bgtasks.storage_metadata_task import get_asset_object_metadata from plane.settings.storage import S3Storage from plane.db.models import FileAsset, User, Workspace from plane.api.views.base import BaseAPIView @@ -44,6 +43,11 @@ from plane.utils.openapi import ( ) from plane.utils.exception_logger import log_exception from plane.utils.upload_limits import resolve_workspace_upload_size_limit +from plane.utils.file_dedup import ( + UploadedObjectMissing, + confirm_uploaded_file_asset, + release_file_asset_blob, +) class UserAssetEndpoint(BaseAPIView): @@ -53,9 +57,11 @@ class UserAssetEndpoint(BaseAPIView): asset = FileAsset.objects.filter(id=asset_id).first() if asset is None: return + if not asset.is_uploaded: + release_file_asset_blob(asset, delete_untracked_object=True) asset.is_deleted = True asset.deleted_at = timezone.now() - asset.save(update_fields=["is_deleted", "deleted_at"]) + asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"]) return def entity_asset_delete(self, entity_type, asset, request): @@ -209,15 +215,17 @@ class UserAssetEndpoint(BaseAPIView): """ # get the asset id asset = FileAsset.objects.get(id=asset_id, user_id=request.user.id) - # get the storage metadata - asset.is_uploaded = True - # get the storage metadata - if not asset.storage_metadata: - get_asset_object_metadata.delay(asset_id=str(asset_id)) - # update the attributes - asset.attributes = request.data.get("attributes", asset.attributes) - # save the asset - asset.save(update_fields=["is_uploaded", "attributes"]) + try: + confirm_uploaded_file_asset( + asset, + request=request, + attributes=request.data.get("attributes", asset.attributes), + ) + except UploadedObjectMissing: + return Response( + {"error": "The uploaded asset object was not found.", "status": False}, + status=status.HTTP_400_BAD_REQUEST, + ) return Response(status=status.HTTP_204_NO_CONTENT) @asset_docs( @@ -236,11 +244,13 @@ class UserAssetEndpoint(BaseAPIView): This performs a soft delete by marking the asset as deleted and updating the user's profile. """ asset = FileAsset.objects.get(id=asset_id, user_id=request.user.id) + if not asset.is_uploaded: + release_file_asset_blob(asset, request=request, delete_untracked_object=True) asset.is_deleted = True asset.deleted_at = timezone.now() # get the entity and save the asset id for the request field self.entity_asset_delete(entity_type=asset.entity_type, asset=asset, request=request) - asset.save(update_fields=["is_deleted", "deleted_at"]) + asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"]) return Response(status=status.HTTP_204_NO_CONTENT) @@ -251,9 +261,11 @@ class UserServerAssetEndpoint(BaseAPIView): asset = FileAsset.objects.filter(id=asset_id).first() if asset is None: return + if not asset.is_uploaded: + release_file_asset_blob(asset, delete_untracked_object=True) asset.is_deleted = True asset.deleted_at = timezone.now() - asset.save(update_fields=["is_deleted", "deleted_at"]) + asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"]) return def entity_asset_delete(self, entity_type, asset, request): @@ -365,15 +377,17 @@ class UserServerAssetEndpoint(BaseAPIView): """ # get the asset id asset = FileAsset.objects.get(id=asset_id, user_id=request.user.id) - # get the storage metadata - asset.is_uploaded = True - # get the storage metadata - if not asset.storage_metadata: - get_asset_object_metadata.delay(asset_id=str(asset_id)) - # update the attributes - asset.attributes = request.data.get("attributes", asset.attributes) - # save the asset - asset.save(update_fields=["is_uploaded", "attributes"]) + try: + confirm_uploaded_file_asset( + asset, + request=request, + attributes=request.data.get("attributes", asset.attributes), + ) + except UploadedObjectMissing: + return Response( + {"error": "The uploaded asset object was not found.", "status": False}, + status=status.HTTP_400_BAD_REQUEST, + ) return Response(status=status.HTTP_204_NO_CONTENT) @asset_docs( @@ -393,11 +407,13 @@ class UserServerAssetEndpoint(BaseAPIView): asset as deleted and updating the user's profile. """ asset = FileAsset.objects.get(id=asset_id, user_id=request.user.id) + if not asset.is_uploaded: + release_file_asset_blob(asset, request=request, delete_untracked_object=True) asset.is_deleted = True asset.deleted_at = timezone.now() # get the entity and save the asset id for the request field self.entity_asset_delete(entity_type=asset.entity_type, asset=asset, request=request) - asset.save(update_fields=["is_deleted", "deleted_at"]) + asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"]) return Response(status=status.HTTP_204_NO_CONTENT) @@ -604,15 +620,21 @@ class GenericAssetEndpoint(BaseAPIView): try: asset = FileAsset.objects.get(id=asset_id, workspace__slug=slug, is_deleted=False) - # Update is_uploaded status - asset.is_uploaded = request.data.get("is_uploaded", asset.is_uploaded) - - # Update storage metadata if not present - if not asset.storage_metadata: - get_asset_object_metadata.delay(asset_id=str(asset_id)) - - asset.save(update_fields=["is_uploaded"]) + if request.data.get("is_uploaded", asset.is_uploaded): + confirm_uploaded_file_asset( + asset, + request=request, + attributes=request.data.get("attributes"), + ) + else: + asset.is_uploaded = False + asset.save(update_fields=["is_uploaded", "updated_at"]) return Response(status=status.HTTP_204_NO_CONTENT) except FileAsset.DoesNotExist: return Response({"error": "Asset not found"}, status=status.HTTP_404_NOT_FOUND) + except UploadedObjectMissing: + return Response( + {"error": "The uploaded asset object was not found.", "status": False}, + status=status.HTTP_400_BAD_REQUEST, + ) diff --git a/plane-src/apps/api/plane/api/views/issue.py b/plane-src/apps/api/plane/api/views/issue.py index 1d36d7a..393fec1 100644 --- a/plane-src/apps/api/plane/api/views/issue.py +++ b/plane-src/apps/api/plane/api/views/issue.py @@ -82,11 +82,11 @@ from plane.db.models import ( Workspace, ) from plane.settings.storage import S3Storage -from plane.bgtasks.storage_metadata_task import get_asset_object_metadata from .base import BaseAPIView from plane.utils.host import base_host from plane.utils.issue_relation_mapper import get_actual_relation from plane.utils.attachment_preview import attachment_object_exists, get_attachment_preview_response +from plane.utils.file_dedup import finalize_uploaded_file_asset, release_file_asset_blob, UploadedObjectMissing from plane.utils.upload_limits import resolve_workspace_upload_size_limit from plane.bgtasks.webhook_task import model_activity from plane.app.permissions import ROLE @@ -2020,9 +2020,11 @@ class IssueAttachmentDetailAPIEndpoint(BaseAPIView): ) issue_attachment = FileAsset.objects.get(pk=pk, workspace__slug=slug, project_id=project_id) + if not issue_attachment.is_uploaded: + release_file_asset_blob(issue_attachment, request=request, delete_untracked_object=True) issue_attachment.is_deleted = True issue_attachment.deleted_at = timezone.now() - issue_attachment.save() + issue_attachment.save(update_fields=["is_deleted", "deleted_at", "updated_at"]) issue_activity.delay( type="attachment.activity.deleted", @@ -2036,10 +2038,6 @@ class IssueAttachmentDetailAPIEndpoint(BaseAPIView): origin=base_host(request=request, is_app=True), ) - # Get the storage metadata - if not issue_attachment.storage_metadata: - get_asset_object_metadata.delay(str(issue_attachment.id)) - issue_attachment.save() return Response(status=status.HTTP_204_NO_CONTENT) @issue_attachment_docs( @@ -2174,14 +2172,16 @@ class IssueAttachmentDetailAPIEndpoint(BaseAPIView): origin=base_host(request=request, is_app=True), ) - # Update the attachment - issue_attachment.is_uploaded = True issue_attachment.created_by = request.user + issue_attachment.save(update_fields=["created_by", "updated_at"]) - # Get the storage metadata - if not issue_attachment.storage_metadata: - get_asset_object_metadata.delay(str(issue_attachment.id)) - issue_attachment.save() + try: + finalize_uploaded_file_asset(issue_attachment, request=request) + except UploadedObjectMissing: + return Response( + {"error": "The uploaded attachment object was not found.", "status": False}, + status=status.HTTP_400_BAD_REQUEST, + ) return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/plane-src/apps/api/plane/app/serializers/asset.py b/plane-src/apps/api/plane/app/serializers/asset.py index 1de5961..9fda0f9 100644 --- a/plane-src/apps/api/plane/app/serializers/asset.py +++ b/plane-src/apps/api/plane/app/serializers/asset.py @@ -10,4 +10,4 @@ class FileAssetSerializer(BaseSerializer): class Meta: model = FileAsset fields = "__all__" - read_only_fields = ["created_by", "updated_by", "created_at", "updated_at"] + read_only_fields = ["created_by", "updated_by", "created_at", "updated_at", "blob"] diff --git a/plane-src/apps/api/plane/app/serializers/issue.py b/plane-src/apps/api/plane/app/serializers/issue.py index 4600d1d..af25400 100644 --- a/plane-src/apps/api/plane/app/serializers/issue.py +++ b/plane-src/apps/api/plane/app/serializers/issue.py @@ -630,6 +630,7 @@ class IssueAttachmentSerializer(BaseSerializer): "workspace", "project", "issue", + "blob", ] diff --git a/plane-src/apps/api/plane/app/views/asset/base.py b/plane-src/apps/api/plane/app/views/asset/base.py index 5b55a76..6cd9a81 100644 --- a/plane-src/apps/api/plane/app/views/asset/base.py +++ b/plane-src/apps/api/plane/app/views/asset/base.py @@ -11,6 +11,7 @@ from rest_framework.parsers import MultiPartParser, FormParser, JSONParser from ..base import BaseAPIView, BaseViewSet from plane.db.models import FileAsset, Workspace from plane.app.serializers import FileAssetSerializer +from plane.utils.file_dedup import UploadedObjectMissing, confirm_uploaded_file_asset, release_file_asset_blob class FileAssetEndpoint(BaseAPIView): @@ -37,15 +38,24 @@ class FileAssetEndpoint(BaseAPIView): if serializer.is_valid(): # Get the workspace workspace = Workspace.objects.get(slug=slug) - serializer.save(workspace_id=workspace.id) + asset = serializer.save(workspace_id=workspace.id) + try: + confirm_uploaded_file_asset(asset, request=request) + except UploadedObjectMissing: + return Response( + {"error": "The uploaded asset object was not found.", "status": False}, + status=status.HTTP_400_BAD_REQUEST, + ) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) def delete(self, request, workspace_id, asset_key): asset_key = str(workspace_id) + "/" + asset_key file_asset = FileAsset.objects.get(asset=asset_key) + if not file_asset.is_uploaded: + release_file_asset_blob(file_asset, request=request, delete_untracked_object=True) file_asset.is_deleted = True - file_asset.save(update_fields=["is_deleted"]) + file_asset.save(update_fields=["is_deleted", "updated_at"]) return Response(status=status.HTTP_204_NO_CONTENT) @@ -75,12 +85,21 @@ class UserAssetsEndpoint(BaseAPIView): def post(self, request): serializer = FileAssetSerializer(data=request.data) if serializer.is_valid(): - serializer.save() + asset = serializer.save() + try: + confirm_uploaded_file_asset(asset, request=request) + except UploadedObjectMissing: + return Response( + {"error": "The uploaded asset object was not found.", "status": False}, + status=status.HTTP_400_BAD_REQUEST, + ) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) def delete(self, request, asset_key): file_asset = FileAsset.objects.get(asset=asset_key, created_by=request.user) + if not file_asset.is_uploaded: + release_file_asset_blob(file_asset, request=request, delete_untracked_object=True) file_asset.is_deleted = True - file_asset.save(update_fields=["is_deleted"]) + file_asset.save(update_fields=["is_deleted", "updated_at"]) return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/plane-src/apps/api/plane/app/views/asset/v2.py b/plane-src/apps/api/plane/app/views/asset/v2.py index 3d8d052..0b55a7a 100644 --- a/plane-src/apps/api/plane/app/views/asset/v2.py +++ b/plane-src/apps/api/plane/app/views/asset/v2.py @@ -22,9 +22,14 @@ from plane.db.models import FileAsset, Workspace, Project, User from plane.settings.storage import S3Storage from plane.app.permissions import allow_permission, ROLE from plane.utils.cache import invalidate_cache_directly -from plane.bgtasks.storage_metadata_task import get_asset_object_metadata from plane.throttles.asset import AssetRateThrottle from plane.utils.upload_limits import resolve_workspace_upload_size_limit +from plane.utils.file_dedup import ( + UploadedObjectMissing, + attach_existing_blob_to_file_asset, + confirm_uploaded_file_asset, + release_file_asset_blob, +) class UserAssetsV2Endpoint(BaseAPIView): @@ -34,9 +39,11 @@ class UserAssetsV2Endpoint(BaseAPIView): asset = FileAsset.objects.filter(id=asset_id).first() if asset is None: return + if not asset.is_uploaded: + release_file_asset_blob(asset, delete_untracked_object=True) asset.is_deleted = True asset.deleted_at = timezone.now() - asset.save(update_fields=["is_deleted", "deleted_at"]) + asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"]) return def entity_asset_save(self, asset_id, entity_type, asset, request): @@ -171,11 +178,17 @@ class UserAssetsV2Endpoint(BaseAPIView): def patch(self, request, asset_id): # get the asset id asset = FileAsset.objects.get(id=asset_id, user_id=request.user.id) - # get the storage metadata - asset.is_uploaded = True - # get the storage metadata - if not asset.storage_metadata: - get_asset_object_metadata.delay(asset_id=str(asset_id)) + try: + asset = confirm_uploaded_file_asset( + asset, + request=request, + attributes=request.data.get("attributes", asset.attributes), + ) + except UploadedObjectMissing: + return Response( + {"error": "The uploaded asset object was not found.", "status": False}, + status=status.HTTP_400_BAD_REQUEST, + ) # get the entity and save the asset id for the request field self.entity_asset_save( asset_id=asset_id, @@ -183,19 +196,17 @@ class UserAssetsV2Endpoint(BaseAPIView): asset=asset, request=request, ) - # update the attributes - asset.attributes = request.data.get("attributes", asset.attributes) - # save the asset - asset.save(update_fields=["is_uploaded", "attributes"]) return Response(status=status.HTTP_204_NO_CONTENT) def delete(self, request, asset_id): asset = FileAsset.objects.get(id=asset_id, user_id=request.user.id) + if not asset.is_uploaded: + release_file_asset_blob(asset, request=request, delete_untracked_object=True) asset.is_deleted = True asset.deleted_at = timezone.now() # get the entity and save the asset id for the request field self.entity_asset_delete(entity_type=asset.entity_type, asset=asset, request=request) - asset.save(update_fields=["is_deleted", "deleted_at"]) + asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"]) return Response(status=status.HTTP_204_NO_CONTENT) @@ -239,10 +250,12 @@ class WorkspaceFileAssetEndpoint(BaseAPIView): # Check if the asset exists if asset is None: return + if not asset.is_uploaded: + release_file_asset_blob(asset, delete_untracked_object=True) # Mark the asset as deleted asset.is_deleted = True asset.deleted_at = timezone.now() - asset.save(update_fields=["is_deleted", "deleted_at"]) + asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"]) return def entity_asset_save(self, asset_id, entity_type, asset, request): @@ -380,11 +393,17 @@ class WorkspaceFileAssetEndpoint(BaseAPIView): def patch(self, request, slug, asset_id): # get the asset id asset = FileAsset.objects.get(id=asset_id, workspace__slug=slug) - # get the storage metadata - asset.is_uploaded = True - # get the storage metadata - if not asset.storage_metadata: - get_asset_object_metadata.delay(asset_id=str(asset_id)) + try: + asset = confirm_uploaded_file_asset( + asset, + request=request, + attributes=request.data.get("attributes", asset.attributes), + ) + except UploadedObjectMissing: + return Response( + {"error": "The uploaded asset object was not found.", "status": False}, + status=status.HTTP_400_BAD_REQUEST, + ) # get the entity and save the asset id for the request field self.entity_asset_save( asset_id=asset_id, @@ -392,19 +411,17 @@ class WorkspaceFileAssetEndpoint(BaseAPIView): asset=asset, request=request, ) - # update the attributes - asset.attributes = request.data.get("attributes", asset.attributes) - # save the asset - asset.save(update_fields=["is_uploaded", "attributes"]) return Response(status=status.HTTP_204_NO_CONTENT) def delete(self, request, slug, asset_id): asset = FileAsset.objects.get(id=asset_id, workspace__slug=slug) + if not asset.is_uploaded: + release_file_asset_blob(asset, request=request, delete_untracked_object=True) asset.is_deleted = True asset.deleted_at = timezone.now() # get the entity and save the asset id for the request field self.entity_asset_delete(entity_type=asset.entity_type, asset=asset, request=request) - asset.save(update_fields=["is_deleted", "deleted_at"]) + asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"]) return Response(status=status.HTTP_204_NO_CONTENT) def get(self, request, slug, asset_id): @@ -581,27 +598,30 @@ class ProjectAssetEndpoint(BaseAPIView): def patch(self, request, slug, project_id, pk): # get the asset id asset = FileAsset.objects.get(id=pk, workspace__slug=slug, project_id=project_id) - # get the storage metadata - asset.is_uploaded = True - # get the storage metadata - if not asset.storage_metadata: - get_asset_object_metadata.delay(asset_id=str(pk)) - - # update the attributes - asset.attributes = request.data.get("attributes", asset.attributes) - # save the asset - asset.save(update_fields=["is_uploaded", "attributes"]) + try: + confirm_uploaded_file_asset( + asset, + request=request, + attributes=request.data.get("attributes", asset.attributes), + ) + except UploadedObjectMissing: + return Response( + {"error": "The uploaded asset object was not found.", "status": False}, + status=status.HTTP_400_BAD_REQUEST, + ) return Response(status=status.HTTP_204_NO_CONTENT) @allow_permission([ROLE.ADMIN, ROLE.MEMBER, ROLE.GUEST]) def delete(self, request, slug, project_id, pk): # Get the asset asset = FileAsset.objects.get(id=pk, workspace__slug=slug, project_id=project_id) + if not asset.is_uploaded: + release_file_asset_blob(asset, request=request, delete_untracked_object=True) # Check deleted assets asset.is_deleted = True asset.deleted_at = timezone.now() # Save the asset - asset.save(update_fields=["is_deleted", "deleted_at"]) + asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"]) return Response(status=status.HTTP_204_NO_CONTENT) @allow_permission([ROLE.ADMIN, ROLE.MEMBER, ROLE.GUEST]) @@ -753,7 +773,7 @@ class DuplicateAssetEndpoint(BaseAPIView): return Response({"error": "Project not found"}, status=status.HTTP_404_NOT_FOUND) storage = S3Storage(request=request) - original_asset = FileAsset.objects.filter(id=asset_id, is_uploaded=True).first() + original_asset = FileAsset.objects.filter(id=asset_id, workspace=workspace, is_uploaded=True).first() if not original_asset: return Response({"error": "Asset not found"}, status=status.HTTP_404_NOT_FOUND) @@ -774,9 +794,15 @@ class DuplicateAssetEndpoint(BaseAPIView): storage_metadata=original_asset.storage_metadata, **self.get_entity_id_field(entity_type=entity_type, entity_id=entity_id), ) - storage.copy_object(original_asset.asset, destination_key) - # Update the is_uploaded field for all newly created assets - FileAsset.objects.filter(id=duplicated_asset.id).update(is_uploaded=True) + if not attach_existing_blob_to_file_asset(original_asset, duplicated_asset): + storage.copy_object(original_asset.asset, destination_key) + try: + confirm_uploaded_file_asset(duplicated_asset, request=request) + except UploadedObjectMissing: + return Response( + {"error": "The source asset object was not found.", "status": False}, + status=status.HTTP_400_BAD_REQUEST, + ) return Response({"asset_id": str(duplicated_asset.id)}, status=status.HTTP_200_OK) diff --git a/plane-src/apps/api/plane/app/views/issue/attachment.py b/plane-src/apps/api/plane/app/views/issue/attachment.py index a66b6a8..f825ac2 100644 --- a/plane-src/apps/api/plane/app/views/issue/attachment.py +++ b/plane-src/apps/api/plane/app/views/issue/attachment.py @@ -23,10 +23,10 @@ from plane.db.models import FileAsset, Workspace from plane.bgtasks.issue_activities_task import issue_activity from plane.app.permissions import allow_permission, ROLE from plane.settings.storage import S3Storage -from plane.bgtasks.storage_metadata_task import get_asset_object_metadata from plane.utils.host import base_host from plane.utils.upload_limits import resolve_workspace_upload_size_limit from plane.utils.attachment_preview import attachment_object_exists, get_attachment_preview_response +from plane.utils.file_dedup import finalize_uploaded_file_asset, release_file_asset_blob, UploadedObjectMissing class IssueAttachmentEndpoint(BaseAPIView): @@ -69,7 +69,9 @@ class IssueAttachmentEndpoint(BaseAPIView): {"error": "Issue attachment not found."}, status=status.HTTP_404_NOT_FOUND, ) - issue_attachment.asset.delete(save=False) + release_result = release_file_asset_blob(issue_attachment, request=request) + if not release_result.had_blob: + issue_attachment.asset.delete(save=False) issue_attachment.delete() issue_activity.delay( type="attachment.activity.deleted", @@ -155,9 +157,11 @@ class IssueAttachmentV2Endpoint(BaseAPIView): @allow_permission([ROLE.ADMIN], creator=True, model=FileAsset) def delete(self, request, slug, project_id, issue_id, pk): issue_attachment = FileAsset.objects.get(pk=pk, workspace__slug=slug, project_id=project_id) + if not issue_attachment.is_uploaded: + release_file_asset_blob(issue_attachment, request=request, delete_untracked_object=True) issue_attachment.is_deleted = True issue_attachment.deleted_at = timezone.now() - issue_attachment.save() + issue_attachment.save(update_fields=["is_deleted", "deleted_at", "updated_at"]) issue_activity.delay( type="attachment.activity.deleted", @@ -225,12 +229,14 @@ class IssueAttachmentV2Endpoint(BaseAPIView): origin=base_host(request=request, is_app=True), ) - # Update the attachment - issue_attachment.is_uploaded = True issue_attachment.created_by = request.user + issue_attachment.save(update_fields=["created_by", "updated_at"]) - # Get the storage metadata - if not issue_attachment.storage_metadata: - get_asset_object_metadata.delay(str(issue_attachment.id)) - issue_attachment.save() + try: + finalize_uploaded_file_asset(issue_attachment, request=request) + except UploadedObjectMissing: + return Response( + {"error": "The uploaded attachment object was not found.", "status": False}, + status=status.HTTP_400_BAD_REQUEST, + ) return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/plane-src/apps/api/plane/bgtasks/copy_s3_object.py b/plane-src/apps/api/plane/bgtasks/copy_s3_object.py index 742966a..0b349ad 100644 --- a/plane-src/apps/api/plane/bgtasks/copy_s3_object.py +++ b/plane-src/apps/api/plane/bgtasks/copy_s3_object.py @@ -17,6 +17,7 @@ from plane.utils.exception_logger import log_exception from plane.settings.storage import S3Storage from celery import shared_task from plane.utils.url import normalize_url_path +from plane.utils.file_dedup import attach_existing_blob_to_file_asset, confirm_uploaded_file_asset def get_entity_id_field(entity_type, entity_id): @@ -108,15 +109,16 @@ def copy_assets(entity, entity_identifier, project_id, asset_ids, user_id): storage_metadata=original_asset.storage_metadata, **get_entity_id_field(original_asset.entity_type, entity_identifier), ) - storage.copy_object(original_asset.asset, destination_key) + if not attach_existing_blob_to_file_asset(original_asset, duplicated_asset): + storage.copy_object(original_asset.asset, destination_key) + confirm_uploaded_file_asset(duplicated_asset) + duplicated_assets.append( { "new_asset_id": str(duplicated_asset.id), "old_asset_id": str(original_asset.id), } ) - if duplicated_assets: - FileAsset.objects.filter(pk__in=[item["new_asset_id"] for item in duplicated_assets]).update(is_uploaded=True) return duplicated_assets diff --git a/plane-src/apps/api/plane/bgtasks/file_asset_task.py b/plane-src/apps/api/plane/bgtasks/file_asset_task.py index e54a754..bbad46b 100644 --- a/plane-src/apps/api/plane/bgtasks/file_asset_task.py +++ b/plane-src/apps/api/plane/bgtasks/file_asset_task.py @@ -15,12 +15,16 @@ from celery import shared_task # Module imports from plane.db.models import FileAsset +from plane.utils.file_dedup import release_file_asset_blob @shared_task def delete_unuploaded_file_asset(): """This task deletes unuploaded file assets older than a certain number of days.""" - FileAsset.objects.filter( + stale_assets = FileAsset.objects.filter( Q(created_at__lt=timezone.now() - timedelta(days=int(os.environ.get("UNUPLOADED_ASSET_DELETE_DAYS", "7")))) & Q(is_uploaded=False) - ).delete() + ) + for asset in stale_assets.iterator(): + release_file_asset_blob(asset, delete_untracked_object=True) + asset.delete() diff --git a/plane-src/apps/api/plane/db/migrations/0128_stored_blob_dedup.py b/plane-src/apps/api/plane/db/migrations/0128_stored_blob_dedup.py new file mode 100644 index 0000000..53fbc67 --- /dev/null +++ b/plane-src/apps/api/plane/db/migrations/0128_stored_blob_dedup.py @@ -0,0 +1,112 @@ +# Generated by Codex on 2026-04-26 + +import uuid + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ("db", "0127_issue_detail_layout"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name="StoredBlob", + fields=[ + ("created_at", models.DateTimeField(auto_now_add=True, verbose_name="Created At")), + ("updated_at", models.DateTimeField(auto_now=True, verbose_name="Last Modified At")), + ("deleted_at", models.DateTimeField(blank=True, null=True, verbose_name="Deleted At")), + ( + "id", + models.UUIDField( + db_index=True, + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + unique=True, + ), + ), + ("sha256", models.CharField(db_index=True, max_length=64)), + ("size", models.PositiveBigIntegerField(default=0)), + ("mime_type", models.CharField(blank=True, default="", max_length=255)), + ("canonical_object_key", models.CharField(max_length=800)), + ( + "status", + models.CharField( + choices=[("active", "Active"), ("orphaned", "Orphaned"), ("missing", "Missing")], + default="active", + max_length=32, + ), + ), + ("ref_count", models.PositiveIntegerField(default=0)), + ("storage_metadata", models.JSONField(blank=True, default=dict, null=True)), + ( + "created_by", + models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="%(class)s_created_by", + to=settings.AUTH_USER_MODEL, + verbose_name="Created By", + ), + ), + ( + "updated_by", + models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="%(class)s_updated_by", + to=settings.AUTH_USER_MODEL, + verbose_name="Last Modified By", + ), + ), + ( + "workspace", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="stored_blobs", + to="db.workspace", + ), + ), + ], + options={ + "verbose_name": "Stored Blob", + "verbose_name_plural": "Stored Blobs", + "db_table": "stored_blobs", + "ordering": ("-created_at",), + }, + ), + migrations.AddField( + model_name="fileasset", + name="blob", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="assets", + to="db.storedblob", + ), + ), + migrations.AddIndex( + model_name="storedblob", + index=models.Index(fields=["workspace", "sha256"], name="stored_blob_workspace_sha_idx"), + ), + migrations.AddIndex( + model_name="storedblob", + index=models.Index(fields=["status"], name="stored_blob_status_idx"), + ), + migrations.AddConstraint( + model_name="storedblob", + constraint=models.UniqueConstraint( + condition=models.Q(("deleted_at__isnull", True), ("status", "active")), + fields=("workspace", "sha256", "size"), + name="stored_blob_unique_active_hash_size", + ), + ), + ] diff --git a/plane-src/apps/api/plane/db/models/__init__.py b/plane-src/apps/api/plane/db/models/__init__.py index 7c3d3b3..68c9566 100644 --- a/plane-src/apps/api/plane/db/models/__init__.py +++ b/plane-src/apps/api/plane/db/models/__init__.py @@ -4,7 +4,7 @@ from .analytic import AnalyticView from .api import APIActivityLog, APIToken -from .asset import FileAsset +from .asset import FileAsset, StoredBlob from .base import BaseModel from .cycle import Cycle, CycleIssue, CycleUserProperties from .deploy_board import DeployBoard diff --git a/plane-src/apps/api/plane/db/models/asset.py b/plane-src/apps/api/plane/db/models/asset.py index d309135..3d2821e 100644 --- a/plane-src/apps/api/plane/db/models/asset.py +++ b/plane-src/apps/api/plane/db/models/asset.py @@ -60,6 +60,13 @@ class FileAsset(BaseModel): size = models.FloatField(default=0) is_uploaded = models.BooleanField(default=False) storage_metadata = models.JSONField(default=dict, null=True, blank=True) + blob = models.ForeignKey( + "db.StoredBlob", + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="assets", + ) class Meta: verbose_name = "File Asset" @@ -98,3 +105,43 @@ class FileAsset(BaseModel): return f"/api/assets/v2/workspaces/{self.workspace.slug}/projects/{self.project_id}/{self.id}/" return None + + +class StoredBlob(BaseModel): + """ + Canonical stored object shared by one or more FileAsset records. + """ + + class Status(models.TextChoices): + ACTIVE = "active", "Active" + ORPHANED = "orphaned", "Orphaned" + MISSING = "missing", "Missing" + + workspace = models.ForeignKey("db.Workspace", on_delete=models.CASCADE, related_name="stored_blobs") + sha256 = models.CharField(max_length=64, db_index=True) + size = models.PositiveBigIntegerField(default=0) + mime_type = models.CharField(max_length=255, blank=True, default="") + canonical_object_key = models.CharField(max_length=800) + status = models.CharField(max_length=32, choices=Status.choices, default=Status.ACTIVE) + ref_count = models.PositiveIntegerField(default=0) + storage_metadata = models.JSONField(default=dict, null=True, blank=True) + + class Meta: + verbose_name = "Stored Blob" + verbose_name_plural = "Stored Blobs" + db_table = "stored_blobs" + ordering = ("-created_at",) + indexes = [ + models.Index(fields=["workspace", "sha256"], name="stored_blob_workspace_sha_idx"), + models.Index(fields=["status"], name="stored_blob_status_idx"), + ] + constraints = [ + models.UniqueConstraint( + fields=["workspace", "sha256", "size"], + condition=models.Q(deleted_at__isnull=True, status="active"), + name="stored_blob_unique_active_hash_size", + ) + ] + + def __str__(self): + return self.canonical_object_key diff --git a/plane-src/apps/api/plane/settings/storage.py b/plane-src/apps/api/plane/settings/storage.py index e4a978b..fd5c1fa 100644 --- a/plane-src/apps/api/plane/settings/storage.py +++ b/plane-src/apps/api/plane/settings/storage.py @@ -22,7 +22,7 @@ class S3Storage(S3Boto3Storage): """S3 storage class to generate presigned URLs for S3 objects""" - def __init__(self, request=None): + def __init__(self, request=None, **kwargs): # Get the AWS credentials and bucket name from the environment self.aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") # Use the AWS_SECRET_ACCESS_KEY environment variable for the secret key diff --git a/plane-src/apps/api/plane/space/views/asset.py b/plane-src/apps/api/plane/space/views/asset.py index 1749a8f..99ca310 100644 --- a/plane-src/apps/api/plane/space/views/asset.py +++ b/plane-src/apps/api/plane/space/views/asset.py @@ -15,9 +15,9 @@ from rest_framework import status from rest_framework.permissions import AllowAny, IsAuthenticated from rest_framework.response import Response -from plane.bgtasks.storage_metadata_task import get_asset_object_metadata from plane.db.models import DeployBoard, FileAsset from plane.settings.storage import S3Storage +from plane.utils.file_dedup import UploadedObjectMissing, confirm_uploaded_file_asset, release_file_asset_blob # Module imports from .base import BaseAPIView @@ -141,16 +141,17 @@ class EntityAssetEndpoint(BaseAPIView): # get the asset id asset = FileAsset.objects.get(id=pk, workspace=deploy_board.workspace) - # get the storage metadata - asset.is_uploaded = True - # get the storage metadata - if not asset.storage_metadata: - get_asset_object_metadata.delay(str(asset.id)) - - # update the attributes - asset.attributes = request.data.get("attributes", asset.attributes) - # save the asset - asset.save(update_fields=["attributes", "is_uploaded"]) + try: + confirm_uploaded_file_asset( + asset, + request=request, + attributes=request.data.get("attributes", asset.attributes), + ) + except UploadedObjectMissing: + return Response( + {"error": "The uploaded asset object was not found.", "status": False}, + status=status.HTTP_400_BAD_REQUEST, + ) return Response(status=status.HTTP_204_NO_CONTENT) def delete(self, request, anchor, pk): @@ -161,11 +162,13 @@ class EntityAssetEndpoint(BaseAPIView): return Response({"error": "Project is not published"}, status=status.HTTP_404_NOT_FOUND) # Get the asset asset = FileAsset.objects.get(id=pk, workspace=deploy_board.workspace, project_id=deploy_board.project_id) + if not asset.is_uploaded: + release_file_asset_blob(asset, request=request, delete_untracked_object=True) # Check deleted assets asset.is_deleted = True asset.deleted_at = timezone.now() # Save the asset - asset.save(update_fields=["is_deleted", "deleted_at"]) + asset.save(update_fields=["is_deleted", "deleted_at", "updated_at"]) return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/plane-src/apps/api/plane/tests/unit/utils/test_file_dedup.py b/plane-src/apps/api/plane/tests/unit/utils/test_file_dedup.py new file mode 100644 index 0000000..e8f9b2f --- /dev/null +++ b/plane-src/apps/api/plane/tests/unit/utils/test_file_dedup.py @@ -0,0 +1,152 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + +from unittest.mock import patch + +import pytest + +from plane.db.models import FileAsset, StoredBlob +from plane.utils.file_dedup import attach_existing_blob_to_file_asset, finalize_uploaded_file_asset, release_file_asset_blob + + +class FakeStreamingBody: + def __init__(self, payload): + self.payload = payload + self.closed = False + + def iter_chunks(self, chunk_size): + for index in range(0, len(self.payload), chunk_size): + yield self.payload[index : index + chunk_size] + + def close(self): + self.closed = True + + +class FakeS3Client: + def __init__(self, objects): + self.objects = objects + + def get_object(self, Bucket, Key): + return {"Body": FakeStreamingBody(self.objects[Key]["body"])} + + +class FakeStorage: + def __init__(self, objects): + self.aws_storage_bucket_name = "uploads" + self.s3_client = FakeS3Client(objects) + self.deleted = [] + self.objects = objects + + def get_object_metadata(self, object_name): + item = self.objects.get(object_name) + if item is None: + return None + return { + "ContentType": item["type"], + "ContentLength": len(item["body"]), + "ETag": item.get("etag", "etag"), + "Metadata": {}, + } + + def delete_files(self, object_names): + self.deleted.extend(object_names) + for object_name in object_names: + self.objects.pop(object_name, None) + return True + + +@pytest.fixture +def project(workspace): + from plane.db.models import Project, ProjectMember + + project = Project.objects.create(name="Dedup Project", identifier="DEDUP", workspace=workspace) + ProjectMember.objects.create(project=project, member=workspace.owner) + return project + + +@pytest.fixture +def fake_storage(): + objects = { + "workspace/a.txt": {"body": b"same payload", "type": "text/plain"}, + "workspace/b.txt": {"body": b"same payload", "type": "text/plain"}, + } + return FakeStorage(objects) + + +def create_asset(workspace, project, object_key): + return FileAsset.objects.create( + workspace=workspace, + project=project, + asset=object_key, + size=12, + attributes={"name": object_key.rsplit("/", 1)[-1], "type": "text/plain", "size": 12}, + entity_type=FileAsset.EntityTypeContext.ISSUE_ATTACHMENT, + ) + + +@pytest.mark.django_db +def test_finalize_uploaded_file_asset_reuses_existing_blob(workspace, project, fake_storage): + first_asset = create_asset(workspace, project, "workspace/a.txt") + duplicate_asset = create_asset(workspace, project, "workspace/b.txt") + + with patch("plane.utils.file_dedup.S3Storage", return_value=fake_storage): + first_result = finalize_uploaded_file_asset(first_asset) + duplicate_result = finalize_uploaded_file_asset(duplicate_asset) + + first_asset.refresh_from_db() + duplicate_asset.refresh_from_db() + blob = StoredBlob.objects.get() + + assert first_result.deduplicated is False + assert duplicate_result.deduplicated is True + assert duplicate_result.deleted_duplicate_object is True + assert first_asset.blob_id == blob.id + assert duplicate_asset.blob_id == blob.id + assert first_asset.asset.name == "workspace/a.txt" + assert duplicate_asset.asset.name == "workspace/a.txt" + assert blob.ref_count == 2 + assert fake_storage.deleted == ["workspace/b.txt"] + + +@pytest.mark.django_db +def test_release_file_asset_blob_deletes_canonical_object_after_last_reference(workspace, project, fake_storage): + first_asset = create_asset(workspace, project, "workspace/a.txt") + duplicate_asset = create_asset(workspace, project, "workspace/b.txt") + + with patch("plane.utils.file_dedup.S3Storage", return_value=fake_storage): + finalize_uploaded_file_asset(first_asset) + finalize_uploaded_file_asset(duplicate_asset) + first_release = release_file_asset_blob(first_asset) + second_release = release_file_asset_blob(duplicate_asset) + + blob = StoredBlob.objects.get() + + assert first_release.had_blob is True + assert first_release.deleted_object is False + assert second_release.had_blob is True + assert second_release.deleted_object is True + assert second_release.object_key == "workspace/a.txt" + assert blob.ref_count == 0 + assert blob.status == StoredBlob.Status.ORPHANED + + +@pytest.mark.django_db +def test_attach_existing_blob_reuses_canonical_object_without_copy(workspace, project, fake_storage): + source_asset = create_asset(workspace, project, "workspace/a.txt") + target_asset = create_asset(workspace, project, "workspace/c.txt") + + with patch("plane.utils.file_dedup.S3Storage", return_value=fake_storage): + finalize_uploaded_file_asset(source_asset) + attached = attach_existing_blob_to_file_asset(source_asset, target_asset) + + source_asset.refresh_from_db() + target_asset.refresh_from_db() + blob = StoredBlob.objects.get() + + assert attached is True + assert source_asset.blob_id == blob.id + assert target_asset.blob_id == blob.id + assert target_asset.asset.name == "workspace/a.txt" + assert blob.ref_count == 2 + assert fake_storage.deleted == [] diff --git a/plane-src/apps/api/plane/utils/file_dedup.py b/plane-src/apps/api/plane/utils/file_dedup.py new file mode 100644 index 0000000..fb2446f --- /dev/null +++ b/plane-src/apps/api/plane/utils/file_dedup.py @@ -0,0 +1,317 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + +import hashlib +from dataclasses import dataclass + +from botocore.exceptions import ClientError +from django.db import IntegrityError, transaction + +from plane.db.models import FileAsset, StoredBlob +from plane.settings.storage import S3Storage +from plane.utils.exception_logger import log_exception + + +class UploadedObjectMissing(Exception): + """Raised when a FileAsset points to a storage object that is not available.""" + + +@dataclass(frozen=True) +class FileBlobFinalizeResult: + asset: FileAsset + blob: StoredBlob + sha256: str + deduplicated: bool + deleted_duplicate_object: bool + + +@dataclass(frozen=True) +class FileBlobReleaseResult: + had_blob: bool + deleted_object: bool + object_key: str | None = None + + +def supports_blob_dedup(asset: FileAsset) -> bool: + return bool(asset.workspace_id) + + +def _object_key(asset: FileAsset) -> str: + return str(asset.asset.name or asset.asset) + + +def _get_object_metadata(storage: S3Storage, object_key: str) -> dict: + metadata = storage.get_object_metadata(object_name=object_key) + if not metadata: + raise UploadedObjectMissing(f"Storage object not found: {object_key}") + return metadata + + +def _calculate_object_sha256(storage: S3Storage, object_key: str) -> str: + try: + response = storage.s3_client.get_object(Bucket=storage.aws_storage_bucket_name, Key=object_key) + except ClientError as exc: + raise UploadedObjectMissing(f"Storage object not found: {object_key}") from exc + + digest = hashlib.sha256() + body = response["Body"] + try: + for chunk in body.iter_chunks(chunk_size=1024 * 1024): + if chunk: + digest.update(chunk) + finally: + body.close() + + return digest.hexdigest() + + +def _resolve_size(asset: FileAsset, metadata: dict) -> int: + metadata_size = metadata.get("ContentLength") + if metadata_size is not None: + return int(metadata_size) + + attr_size = (asset.attributes or {}).get("size") + if attr_size is not None: + return int(float(attr_size)) + + return int(asset.size or 0) + + +def _resolve_mime_type(asset: FileAsset, metadata: dict) -> str: + return str((asset.attributes or {}).get("type") or metadata.get("ContentType") or "") + + +def finalize_uploaded_file_asset(asset: FileAsset, request=None) -> FileBlobFinalizeResult: + """ + Attach a confirmed upload to a canonical StoredBlob. + + The uploaded object is treated as a temporary candidate until its SHA-256 is + known. If the same blob already exists in the workspace, the FileAsset is + repointed to the canonical object and the duplicate candidate object is + removed from storage. + """ + + if not asset.workspace_id: + raise ValueError("FileAsset workspace is required for blob deduplication.") + + storage = S3Storage(request=request) + candidate_key = _object_key(asset) + candidate_metadata = _get_object_metadata(storage, candidate_key) + candidate_sha256 = _calculate_object_sha256(storage, candidate_key) + candidate_size = _resolve_size(asset, candidate_metadata) + candidate_mime_type = _resolve_mime_type(asset, candidate_metadata) + + duplicate_object_key = None + released_object_key = None + + with transaction.atomic(): + locked_asset = FileAsset.objects.select_for_update().get(pk=asset.pk) + previous_blob_id = locked_asset.blob_id + + blob = ( + StoredBlob.objects.select_for_update() + .filter( + workspace_id=locked_asset.workspace_id, + sha256=candidate_sha256, + size=candidate_size, + status=StoredBlob.Status.ACTIVE, + deleted_at__isnull=True, + ) + .first() + ) + if blob is None: + try: + with transaction.atomic(): + blob = StoredBlob.objects.create( + workspace_id=locked_asset.workspace_id, + sha256=candidate_sha256, + size=candidate_size, + mime_type=candidate_mime_type, + canonical_object_key=candidate_key, + status=StoredBlob.Status.ACTIVE, + ref_count=0, + storage_metadata=candidate_metadata, + ) + except IntegrityError: + blob = ( + StoredBlob.objects.select_for_update() + .filter( + workspace_id=locked_asset.workspace_id, + sha256=candidate_sha256, + size=candidate_size, + status=StoredBlob.Status.ACTIVE, + deleted_at__isnull=True, + ) + .get() + ) + + deduplicated = blob.canonical_object_key != candidate_key + if deduplicated: + duplicate_object_key = candidate_key + + if previous_blob_id and previous_blob_id != blob.id: + previous_blob = StoredBlob.objects.select_for_update().filter(pk=previous_blob_id).first() + if previous_blob: + previous_blob.ref_count = max(previous_blob.ref_count - 1, 0) + if previous_blob.ref_count == 0 and previous_blob.status == StoredBlob.Status.ACTIVE: + previous_blob.status = StoredBlob.Status.ORPHANED + released_object_key = previous_blob.canonical_object_key + previous_blob.save(update_fields=["ref_count", "status", "updated_at"]) + + if previous_blob_id != blob.id: + blob.ref_count += 1 + blob.save(update_fields=["ref_count", "updated_at"]) + + attributes = dict(locked_asset.attributes or {}) + attributes["size"] = candidate_size + attributes["type"] = candidate_mime_type + attributes["sha256"] = candidate_sha256 + attributes["blob_id"] = str(blob.id) + attributes["deduplicated"] = deduplicated + + locked_asset.blob = blob + locked_asset.asset = blob.canonical_object_key + locked_asset.size = candidate_size + locked_asset.attributes = attributes + locked_asset.storage_metadata = blob.storage_metadata or candidate_metadata + locked_asset.is_uploaded = True + locked_asset.save( + update_fields=[ + "blob", + "asset", + "size", + "attributes", + "storage_metadata", + "is_uploaded", + "updated_at", + ] + ) + + deleted_duplicate_object = False + object_keys_to_delete = [key for key in {duplicate_object_key, released_object_key} if key] + if object_keys_to_delete: + deleted_duplicate_object = storage.delete_files(object_names=object_keys_to_delete) + + locked_asset.refresh_from_db() + asset.blob_id = locked_asset.blob_id + asset.asset = locked_asset.asset + asset.size = locked_asset.size + asset.attributes = locked_asset.attributes + asset.storage_metadata = locked_asset.storage_metadata + asset.is_uploaded = locked_asset.is_uploaded + return FileBlobFinalizeResult( + asset=locked_asset, + blob=blob, + sha256=candidate_sha256, + deduplicated=deduplicated, + deleted_duplicate_object=deleted_duplicate_object, + ) + + +def confirm_uploaded_file_asset(asset: FileAsset, request=None, attributes: dict | None = None) -> FileAsset: + if attributes is not None: + asset.attributes = attributes + asset.save(update_fields=["attributes", "updated_at"]) + + if supports_blob_dedup(asset): + return finalize_uploaded_file_asset(asset, request=request).asset + + storage = S3Storage(request=request) + asset.storage_metadata = storage.get_object_metadata(object_name=_object_key(asset)) or asset.storage_metadata + asset.is_uploaded = True + asset.save(update_fields=["storage_metadata", "is_uploaded", "updated_at"]) + return asset + + +def release_file_asset_blob( + asset: FileAsset, + request=None, + delete_untracked_object: bool = False, +) -> FileBlobReleaseResult: + """ + Release the blob reference held by a FileAsset. + + The canonical object is deleted only when the last blob-backed FileAsset + releases it. Legacy non-blob assets are left intact unless the caller marks + them as temporary/untracked. + """ + + storage = S3Storage(request=request) + delete_object_key = None + had_blob = False + + with transaction.atomic(): + locked_asset = FileAsset.objects.select_for_update().get(pk=asset.pk) + current_key = _object_key(locked_asset) + + if locked_asset.blob_id: + had_blob = True + blob = StoredBlob.objects.select_for_update().get(pk=locked_asset.blob_id) + blob.ref_count = max(blob.ref_count - 1, 0) + if blob.ref_count == 0 and blob.status == StoredBlob.Status.ACTIVE: + blob.status = StoredBlob.Status.ORPHANED + delete_object_key = blob.canonical_object_key + blob.save(update_fields=["ref_count", "status", "updated_at"]) + + locked_asset.blob = None + locked_asset.save(update_fields=["blob", "updated_at"]) + elif delete_untracked_object and current_key: + delete_object_key = current_key + + if had_blob: + asset.blob_id = None + + deleted_object = False + if delete_object_key: + try: + deleted_object = storage.delete_files(object_names=[delete_object_key]) + except Exception as exc: + log_exception(exc) + deleted_object = False + + return FileBlobReleaseResult(had_blob=had_blob, deleted_object=deleted_object, object_key=delete_object_key) + + +def attach_existing_blob_to_file_asset(source: FileAsset, target: FileAsset) -> bool: + if not source.blob_id: + source.refresh_from_db(fields=["blob"]) + + if not source.blob_id: + return False + + with transaction.atomic(): + source_blob = StoredBlob.objects.select_for_update().get(pk=source.blob_id) + locked_target = FileAsset.objects.select_for_update().get(pk=target.pk) + if locked_target.blob_id == source_blob.id: + return True + + source_blob.ref_count += 1 + source_blob.save(update_fields=["ref_count", "updated_at"]) + + attributes = dict(locked_target.attributes or {}) + attributes["sha256"] = source_blob.sha256 + attributes["blob_id"] = str(source_blob.id) + attributes["deduplicated"] = True + + locked_target.blob = source_blob + locked_target.asset = source_blob.canonical_object_key + locked_target.size = source_blob.size + locked_target.attributes = attributes + locked_target.storage_metadata = source_blob.storage_metadata + locked_target.is_uploaded = True + locked_target.save( + update_fields=[ + "blob", + "asset", + "size", + "attributes", + "storage_metadata", + "is_uploaded", + "updated_at", + ] + ) + + target.blob_id = source.blob_id + target.asset = source_blob.canonical_object_key + return True