362 lines
12 KiB
Python
362 lines
12 KiB
Python
# Copyright (c) 2023-present Plane Software, Inc. and contributors
|
|
# SPDX-License-Identifier: AGPL-3.0-only
|
|
# See the LICENSE file for details.
|
|
|
|
import hashlib
|
|
from dataclasses import dataclass
|
|
|
|
from botocore.exceptions import ClientError
|
|
from django.db import IntegrityError, transaction
|
|
|
|
from plane.db.models import FileAsset, StoredBlob
|
|
from plane.settings.storage import S3Storage
|
|
from plane.utils.exception_logger import log_exception
|
|
|
|
|
|
class UploadedObjectMissing(Exception):
|
|
"""Raised when a FileAsset points to a storage object that is not available."""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FileBlobFinalizeResult:
|
|
asset: FileAsset
|
|
blob: StoredBlob
|
|
sha256: str
|
|
deduplicated: bool
|
|
deleted_duplicate_object: bool
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FileBlobReleaseResult:
|
|
had_blob: bool
|
|
deleted_object: bool
|
|
object_key: str | None = None
|
|
|
|
|
|
def supports_blob_dedup(asset: FileAsset) -> bool:
|
|
return bool(asset.workspace_id)
|
|
|
|
|
|
def _object_key(asset: FileAsset) -> str:
|
|
return str(asset.asset.name or asset.asset)
|
|
|
|
|
|
def _server_storage(request=None) -> S3Storage:
|
|
return S3Storage(request=request, is_server=True)
|
|
|
|
|
|
def _get_object_metadata(storage: S3Storage, object_key: str) -> dict:
|
|
metadata = storage.get_object_metadata(object_name=object_key)
|
|
if not metadata:
|
|
raise UploadedObjectMissing(f"Storage object not found: {object_key}")
|
|
return metadata
|
|
|
|
|
|
def _calculate_object_sha256(storage: S3Storage, object_key: str) -> str:
|
|
try:
|
|
response = storage.s3_client.get_object(Bucket=storage.aws_storage_bucket_name, Key=object_key)
|
|
except ClientError as exc:
|
|
raise UploadedObjectMissing(f"Storage object not found: {object_key}") from exc
|
|
|
|
digest = hashlib.sha256()
|
|
body = response["Body"]
|
|
try:
|
|
for chunk in body.iter_chunks(chunk_size=1024 * 1024):
|
|
if chunk:
|
|
digest.update(chunk)
|
|
finally:
|
|
body.close()
|
|
|
|
return digest.hexdigest()
|
|
|
|
|
|
def _resolve_size(asset: FileAsset, metadata: dict) -> int:
|
|
metadata_size = metadata.get("ContentLength")
|
|
if metadata_size is not None:
|
|
return int(metadata_size)
|
|
|
|
attr_size = (asset.attributes or {}).get("size")
|
|
if attr_size is not None:
|
|
return int(float(attr_size))
|
|
|
|
return int(asset.size or 0)
|
|
|
|
|
|
def _resolve_mime_type(asset: FileAsset, metadata: dict) -> str:
|
|
return str((asset.attributes or {}).get("type") or metadata.get("ContentType") or "")
|
|
|
|
|
|
def finalize_uploaded_file_asset(asset: FileAsset, request=None) -> FileBlobFinalizeResult:
|
|
"""
|
|
Attach a confirmed upload to a canonical StoredBlob.
|
|
|
|
The uploaded object is treated as a temporary candidate until its SHA-256 is
|
|
known. If the same blob already exists in the workspace, the FileAsset is
|
|
repointed to the canonical object and the duplicate candidate object is
|
|
removed from storage.
|
|
"""
|
|
|
|
if not asset.workspace_id:
|
|
raise ValueError("FileAsset workspace is required for blob deduplication.")
|
|
|
|
storage = _server_storage(request=request)
|
|
candidate_key = _object_key(asset)
|
|
candidate_metadata = _get_object_metadata(storage, candidate_key)
|
|
candidate_sha256 = _calculate_object_sha256(storage, candidate_key)
|
|
candidate_size = _resolve_size(asset, candidate_metadata)
|
|
candidate_mime_type = _resolve_mime_type(asset, candidate_metadata)
|
|
|
|
duplicate_object_key = None
|
|
released_object_key = None
|
|
|
|
with transaction.atomic():
|
|
locked_asset = FileAsset.objects.select_for_update().get(pk=asset.pk)
|
|
previous_blob_id = locked_asset.blob_id
|
|
|
|
blob = (
|
|
StoredBlob.objects.select_for_update()
|
|
.filter(
|
|
workspace_id=locked_asset.workspace_id,
|
|
sha256=candidate_sha256,
|
|
size=candidate_size,
|
|
status=StoredBlob.Status.ACTIVE,
|
|
deleted_at__isnull=True,
|
|
)
|
|
.first()
|
|
)
|
|
if blob is None:
|
|
try:
|
|
with transaction.atomic():
|
|
blob = StoredBlob.objects.create(
|
|
workspace_id=locked_asset.workspace_id,
|
|
sha256=candidate_sha256,
|
|
size=candidate_size,
|
|
mime_type=candidate_mime_type,
|
|
canonical_object_key=candidate_key,
|
|
status=StoredBlob.Status.ACTIVE,
|
|
ref_count=0,
|
|
storage_metadata=candidate_metadata,
|
|
)
|
|
except IntegrityError:
|
|
blob = (
|
|
StoredBlob.objects.select_for_update()
|
|
.filter(
|
|
workspace_id=locked_asset.workspace_id,
|
|
sha256=candidate_sha256,
|
|
size=candidate_size,
|
|
status=StoredBlob.Status.ACTIVE,
|
|
deleted_at__isnull=True,
|
|
)
|
|
.get()
|
|
)
|
|
|
|
deduplicated = blob.canonical_object_key != candidate_key
|
|
if deduplicated:
|
|
duplicate_object_key = candidate_key
|
|
|
|
if previous_blob_id and previous_blob_id != blob.id:
|
|
previous_blob = StoredBlob.objects.select_for_update().filter(pk=previous_blob_id).first()
|
|
if previous_blob:
|
|
previous_blob.ref_count = max(previous_blob.ref_count - 1, 0)
|
|
if previous_blob.ref_count == 0 and previous_blob.status == StoredBlob.Status.ACTIVE:
|
|
previous_blob.status = StoredBlob.Status.ORPHANED
|
|
released_object_key = previous_blob.canonical_object_key
|
|
previous_blob.save(update_fields=["ref_count", "status", "updated_at"])
|
|
|
|
if previous_blob_id != blob.id:
|
|
blob.ref_count += 1
|
|
blob.save(update_fields=["ref_count", "updated_at"])
|
|
|
|
attributes = dict(locked_asset.attributes or {})
|
|
attributes["size"] = candidate_size
|
|
attributes["type"] = candidate_mime_type
|
|
attributes["sha256"] = candidate_sha256
|
|
attributes["blob_id"] = str(blob.id)
|
|
attributes["deduplicated"] = deduplicated
|
|
|
|
locked_asset.blob = blob
|
|
locked_asset.asset = blob.canonical_object_key
|
|
locked_asset.size = candidate_size
|
|
locked_asset.attributes = attributes
|
|
locked_asset.storage_metadata = blob.storage_metadata or candidate_metadata
|
|
locked_asset.is_uploaded = True
|
|
locked_asset.save(
|
|
update_fields=[
|
|
"blob",
|
|
"asset",
|
|
"size",
|
|
"attributes",
|
|
"storage_metadata",
|
|
"is_uploaded",
|
|
"updated_at",
|
|
]
|
|
)
|
|
|
|
deleted_duplicate_object = False
|
|
object_keys_to_delete = [key for key in {duplicate_object_key, released_object_key} if key]
|
|
if object_keys_to_delete:
|
|
deleted_duplicate_object = storage.delete_files(object_names=object_keys_to_delete)
|
|
|
|
locked_asset.refresh_from_db()
|
|
asset.blob_id = locked_asset.blob_id
|
|
asset.asset = locked_asset.asset
|
|
asset.size = locked_asset.size
|
|
asset.attributes = locked_asset.attributes
|
|
asset.storage_metadata = locked_asset.storage_metadata
|
|
asset.is_uploaded = locked_asset.is_uploaded
|
|
return FileBlobFinalizeResult(
|
|
asset=locked_asset,
|
|
blob=blob,
|
|
sha256=candidate_sha256,
|
|
deduplicated=deduplicated,
|
|
deleted_duplicate_object=deleted_duplicate_object,
|
|
)
|
|
|
|
|
|
def confirm_uploaded_file_asset(asset: FileAsset, request=None, attributes: dict | None = None) -> FileAsset:
|
|
if attributes is not None:
|
|
asset.attributes = attributes
|
|
asset.save(update_fields=["attributes", "updated_at"])
|
|
|
|
if supports_blob_dedup(asset):
|
|
return finalize_uploaded_file_asset(asset, request=request).asset
|
|
|
|
storage = _server_storage(request=request)
|
|
asset.storage_metadata = storage.get_object_metadata(object_name=_object_key(asset)) or asset.storage_metadata
|
|
asset.is_uploaded = True
|
|
asset.save(update_fields=["storage_metadata", "is_uploaded", "updated_at"])
|
|
return asset
|
|
|
|
|
|
def release_file_asset_blob(
|
|
asset: FileAsset,
|
|
request=None,
|
|
delete_untracked_object: bool = False,
|
|
) -> FileBlobReleaseResult:
|
|
"""
|
|
Release the blob reference held by a FileAsset.
|
|
|
|
The canonical object is deleted only when the last blob-backed FileAsset
|
|
releases it. Legacy non-blob assets are left intact unless the caller marks
|
|
them as temporary/untracked.
|
|
"""
|
|
|
|
storage = _server_storage(request=request)
|
|
delete_object_key = None
|
|
had_blob = False
|
|
|
|
with transaction.atomic():
|
|
locked_asset = FileAsset.all_objects.select_for_update().get(pk=asset.pk)
|
|
current_key = _object_key(locked_asset)
|
|
|
|
if locked_asset.blob_id:
|
|
had_blob = True
|
|
blob = StoredBlob.objects.select_for_update().get(pk=locked_asset.blob_id)
|
|
blob.ref_count = max(blob.ref_count - 1, 0)
|
|
if blob.ref_count == 0 and blob.status == StoredBlob.Status.ACTIVE:
|
|
blob.status = StoredBlob.Status.ORPHANED
|
|
delete_object_key = blob.canonical_object_key
|
|
blob.save(update_fields=["ref_count", "status", "updated_at"])
|
|
|
|
locked_asset.blob = None
|
|
locked_asset.save(update_fields=["blob", "updated_at"])
|
|
elif delete_untracked_object and current_key:
|
|
delete_object_key = current_key
|
|
|
|
if had_blob:
|
|
asset.blob_id = None
|
|
|
|
deleted_object = False
|
|
if delete_object_key:
|
|
try:
|
|
deleted_object = storage.delete_files(object_names=[delete_object_key])
|
|
except Exception as exc:
|
|
log_exception(exc)
|
|
deleted_object = False
|
|
|
|
return FileBlobReleaseResult(had_blob=had_blob, deleted_object=deleted_object, object_key=delete_object_key)
|
|
|
|
|
|
def _delete_legacy_asset_object(asset: FileAsset, storage: S3Storage) -> bool:
|
|
object_key = _object_key(asset)
|
|
if not object_key:
|
|
return False
|
|
|
|
has_other_reference = FileAsset.all_objects.filter(asset=object_key).exclude(pk=asset.pk).exists()
|
|
if has_other_reference:
|
|
return False
|
|
|
|
return storage.delete_files(object_names=[object_key])
|
|
|
|
|
|
def hard_delete_file_asset(asset: FileAsset, request=None, storage: S3Storage | None = None) -> bool:
|
|
"""
|
|
Permanently remove a FileAsset row and release its storage reference.
|
|
|
|
Blob-backed assets release the StoredBlob ref-count. Legacy non-blob assets
|
|
are physically deleted only when no other FileAsset row still points at the
|
|
same object key.
|
|
"""
|
|
|
|
storage = storage or _server_storage(request=request)
|
|
blob_id = asset.blob_id
|
|
deleted_object = False
|
|
|
|
if blob_id:
|
|
release_result = release_file_asset_blob(asset, request=request)
|
|
deleted_object = release_result.deleted_object
|
|
StoredBlob.all_objects.filter(
|
|
pk=blob_id,
|
|
ref_count=0,
|
|
status=StoredBlob.Status.ORPHANED,
|
|
).delete()
|
|
else:
|
|
deleted_object = _delete_legacy_asset_object(asset, storage)
|
|
|
|
asset.delete(soft=False)
|
|
return deleted_object
|
|
|
|
|
|
def attach_existing_blob_to_file_asset(source: FileAsset, target: FileAsset) -> bool:
|
|
if not source.blob_id:
|
|
source.refresh_from_db(fields=["blob"])
|
|
|
|
if not source.blob_id:
|
|
return False
|
|
|
|
with transaction.atomic():
|
|
source_blob = StoredBlob.objects.select_for_update().get(pk=source.blob_id)
|
|
locked_target = FileAsset.objects.select_for_update().get(pk=target.pk)
|
|
if locked_target.blob_id == source_blob.id:
|
|
return True
|
|
|
|
source_blob.ref_count += 1
|
|
source_blob.save(update_fields=["ref_count", "updated_at"])
|
|
|
|
attributes = dict(locked_target.attributes or {})
|
|
attributes["sha256"] = source_blob.sha256
|
|
attributes["blob_id"] = str(source_blob.id)
|
|
attributes["deduplicated"] = True
|
|
|
|
locked_target.blob = source_blob
|
|
locked_target.asset = source_blob.canonical_object_key
|
|
locked_target.size = source_blob.size
|
|
locked_target.attributes = attributes
|
|
locked_target.storage_metadata = source_blob.storage_metadata
|
|
locked_target.is_uploaded = True
|
|
locked_target.save(
|
|
update_fields=[
|
|
"blob",
|
|
"asset",
|
|
"size",
|
|
"attributes",
|
|
"storage_metadata",
|
|
"is_uploaded",
|
|
"updated_at",
|
|
]
|
|
)
|
|
|
|
target.blob_id = source.blob_id
|
|
target.asset = source_blob.canonical_object_key
|
|
return True
|