# Copyright (c) 2023-present Plane Software, Inc. and contributors # SPDX-License-Identifier: AGPL-3.0-only # See the LICENSE file for details. import hashlib from dataclasses import dataclass from botocore.exceptions import ClientError from django.db import IntegrityError, transaction from plane.db.models import FileAsset, StoredBlob from plane.settings.storage import S3Storage from plane.utils.exception_logger import log_exception class UploadedObjectMissing(Exception): """Raised when a FileAsset points to a storage object that is not available.""" @dataclass(frozen=True) class FileBlobFinalizeResult: asset: FileAsset blob: StoredBlob sha256: str deduplicated: bool deleted_duplicate_object: bool @dataclass(frozen=True) class FileBlobReleaseResult: had_blob: bool deleted_object: bool object_key: str | None = None def supports_blob_dedup(asset: FileAsset) -> bool: return bool(asset.workspace_id) def _object_key(asset: FileAsset) -> str: return str(asset.asset.name or asset.asset) def _server_storage(request=None) -> S3Storage: return S3Storage(request=request, is_server=True) def _get_object_metadata(storage: S3Storage, object_key: str) -> dict: metadata = storage.get_object_metadata(object_name=object_key) if not metadata: raise UploadedObjectMissing(f"Storage object not found: {object_key}") return metadata def _calculate_object_sha256(storage: S3Storage, object_key: str) -> str: try: response = storage.s3_client.get_object(Bucket=storage.aws_storage_bucket_name, Key=object_key) except ClientError as exc: raise UploadedObjectMissing(f"Storage object not found: {object_key}") from exc digest = hashlib.sha256() body = response["Body"] try: for chunk in body.iter_chunks(chunk_size=1024 * 1024): if chunk: digest.update(chunk) finally: body.close() return digest.hexdigest() def _resolve_size(asset: FileAsset, metadata: dict) -> int: metadata_size = metadata.get("ContentLength") if metadata_size is not None: return int(metadata_size) attr_size = (asset.attributes or {}).get("size") if attr_size is not None: return int(float(attr_size)) return int(asset.size or 0) def _resolve_mime_type(asset: FileAsset, metadata: dict) -> str: return str((asset.attributes or {}).get("type") or metadata.get("ContentType") or "") def finalize_uploaded_file_asset(asset: FileAsset, request=None) -> FileBlobFinalizeResult: """ Attach a confirmed upload to a canonical StoredBlob. The uploaded object is treated as a temporary candidate until its SHA-256 is known. If the same blob already exists in the workspace, the FileAsset is repointed to the canonical object and the duplicate candidate object is removed from storage. """ if not asset.workspace_id: raise ValueError("FileAsset workspace is required for blob deduplication.") storage = _server_storage(request=request) candidate_key = _object_key(asset) candidate_metadata = _get_object_metadata(storage, candidate_key) candidate_sha256 = _calculate_object_sha256(storage, candidate_key) candidate_size = _resolve_size(asset, candidate_metadata) candidate_mime_type = _resolve_mime_type(asset, candidate_metadata) duplicate_object_key = None released_object_key = None with transaction.atomic(): locked_asset = FileAsset.objects.select_for_update().get(pk=asset.pk) previous_blob_id = locked_asset.blob_id blob = ( StoredBlob.objects.select_for_update() .filter( workspace_id=locked_asset.workspace_id, sha256=candidate_sha256, size=candidate_size, status=StoredBlob.Status.ACTIVE, deleted_at__isnull=True, ) .first() ) if blob is None: try: with transaction.atomic(): blob = StoredBlob.objects.create( workspace_id=locked_asset.workspace_id, sha256=candidate_sha256, size=candidate_size, mime_type=candidate_mime_type, canonical_object_key=candidate_key, status=StoredBlob.Status.ACTIVE, ref_count=0, storage_metadata=candidate_metadata, ) except IntegrityError: blob = ( StoredBlob.objects.select_for_update() .filter( workspace_id=locked_asset.workspace_id, sha256=candidate_sha256, size=candidate_size, status=StoredBlob.Status.ACTIVE, deleted_at__isnull=True, ) .get() ) deduplicated = blob.canonical_object_key != candidate_key if deduplicated: duplicate_object_key = candidate_key if previous_blob_id and previous_blob_id != blob.id: previous_blob = StoredBlob.objects.select_for_update().filter(pk=previous_blob_id).first() if previous_blob: previous_blob.ref_count = max(previous_blob.ref_count - 1, 0) if previous_blob.ref_count == 0 and previous_blob.status == StoredBlob.Status.ACTIVE: previous_blob.status = StoredBlob.Status.ORPHANED released_object_key = previous_blob.canonical_object_key previous_blob.save(update_fields=["ref_count", "status", "updated_at"]) if previous_blob_id != blob.id: blob.ref_count += 1 blob.save(update_fields=["ref_count", "updated_at"]) attributes = dict(locked_asset.attributes or {}) attributes["size"] = candidate_size attributes["type"] = candidate_mime_type attributes["sha256"] = candidate_sha256 attributes["blob_id"] = str(blob.id) attributes["deduplicated"] = deduplicated locked_asset.blob = blob locked_asset.asset = blob.canonical_object_key locked_asset.size = candidate_size locked_asset.attributes = attributes locked_asset.storage_metadata = blob.storage_metadata or candidate_metadata locked_asset.is_uploaded = True locked_asset.save( update_fields=[ "blob", "asset", "size", "attributes", "storage_metadata", "is_uploaded", "updated_at", ] ) deleted_duplicate_object = False object_keys_to_delete = [key for key in {duplicate_object_key, released_object_key} if key] if object_keys_to_delete: deleted_duplicate_object = storage.delete_files(object_names=object_keys_to_delete) locked_asset.refresh_from_db() asset.blob_id = locked_asset.blob_id asset.asset = locked_asset.asset asset.size = locked_asset.size asset.attributes = locked_asset.attributes asset.storage_metadata = locked_asset.storage_metadata asset.is_uploaded = locked_asset.is_uploaded return FileBlobFinalizeResult( asset=locked_asset, blob=blob, sha256=candidate_sha256, deduplicated=deduplicated, deleted_duplicate_object=deleted_duplicate_object, ) def confirm_uploaded_file_asset(asset: FileAsset, request=None, attributes: dict | None = None) -> FileAsset: if attributes is not None: asset.attributes = attributes asset.save(update_fields=["attributes", "updated_at"]) if supports_blob_dedup(asset): return finalize_uploaded_file_asset(asset, request=request).asset storage = _server_storage(request=request) asset.storage_metadata = storage.get_object_metadata(object_name=_object_key(asset)) or asset.storage_metadata asset.is_uploaded = True asset.save(update_fields=["storage_metadata", "is_uploaded", "updated_at"]) return asset def release_file_asset_blob( asset: FileAsset, request=None, delete_untracked_object: bool = False, ) -> FileBlobReleaseResult: """ Release the blob reference held by a FileAsset. The canonical object is deleted only when the last blob-backed FileAsset releases it. Legacy non-blob assets are left intact unless the caller marks them as temporary/untracked. """ storage = _server_storage(request=request) delete_object_key = None had_blob = False with transaction.atomic(): locked_asset = FileAsset.all_objects.select_for_update().get(pk=asset.pk) current_key = _object_key(locked_asset) if locked_asset.blob_id: had_blob = True blob = StoredBlob.objects.select_for_update().get(pk=locked_asset.blob_id) blob.ref_count = max(blob.ref_count - 1, 0) if blob.ref_count == 0 and blob.status == StoredBlob.Status.ACTIVE: blob.status = StoredBlob.Status.ORPHANED delete_object_key = blob.canonical_object_key blob.save(update_fields=["ref_count", "status", "updated_at"]) locked_asset.blob = None locked_asset.save(update_fields=["blob", "updated_at"]) elif delete_untracked_object and current_key: delete_object_key = current_key if had_blob: asset.blob_id = None deleted_object = False if delete_object_key: try: deleted_object = storage.delete_files(object_names=[delete_object_key]) except Exception as exc: log_exception(exc) deleted_object = False return FileBlobReleaseResult(had_blob=had_blob, deleted_object=deleted_object, object_key=delete_object_key) def _delete_legacy_asset_object(asset: FileAsset, storage: S3Storage) -> bool: object_key = _object_key(asset) if not object_key: return False has_other_reference = FileAsset.all_objects.filter(asset=object_key).exclude(pk=asset.pk).exists() if has_other_reference: return False return storage.delete_files(object_names=[object_key]) def hard_delete_file_asset(asset: FileAsset, request=None, storage: S3Storage | None = None) -> bool: """ Permanently remove a FileAsset row and release its storage reference. Blob-backed assets release the StoredBlob ref-count. Legacy non-blob assets are physically deleted only when no other FileAsset row still points at the same object key. """ storage = storage or _server_storage(request=request) blob_id = asset.blob_id deleted_object = False if blob_id: release_result = release_file_asset_blob(asset, request=request) deleted_object = release_result.deleted_object StoredBlob.all_objects.filter( pk=blob_id, ref_count=0, status=StoredBlob.Status.ORPHANED, ).delete() else: deleted_object = _delete_legacy_asset_object(asset, storage) asset.delete(soft=False) return deleted_object def attach_existing_blob_to_file_asset(source: FileAsset, target: FileAsset) -> bool: if not source.blob_id: source.refresh_from_db(fields=["blob"]) if not source.blob_id: return False with transaction.atomic(): source_blob = StoredBlob.objects.select_for_update().get(pk=source.blob_id) locked_target = FileAsset.objects.select_for_update().get(pk=target.pk) if locked_target.blob_id == source_blob.id: return True source_blob.ref_count += 1 source_blob.save(update_fields=["ref_count", "updated_at"]) attributes = dict(locked_target.attributes or {}) attributes["sha256"] = source_blob.sha256 attributes["blob_id"] = str(source_blob.id) attributes["deduplicated"] = True locked_target.blob = source_blob locked_target.asset = source_blob.canonical_object_key locked_target.size = source_blob.size locked_target.attributes = attributes locked_target.storage_metadata = source_blob.storage_metadata locked_target.is_uploaded = True locked_target.save( update_fields=[ "blob", "asset", "size", "attributes", "storage_metadata", "is_uploaded", "updated_at", ] ) target.blob_id = source.blob_id target.asset = source_blob.canonical_object_key return True