NODEDC_TASKMANAGER/plane-src/apps/api/plane/utils/file_dedup.py

322 lines
11 KiB
Python

# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
import hashlib
from dataclasses import dataclass
from botocore.exceptions import ClientError
from django.db import IntegrityError, transaction
from plane.db.models import FileAsset, StoredBlob
from plane.settings.storage import S3Storage
from plane.utils.exception_logger import log_exception
class UploadedObjectMissing(Exception):
"""Raised when a FileAsset points to a storage object that is not available."""
@dataclass(frozen=True)
class FileBlobFinalizeResult:
asset: FileAsset
blob: StoredBlob
sha256: str
deduplicated: bool
deleted_duplicate_object: bool
@dataclass(frozen=True)
class FileBlobReleaseResult:
had_blob: bool
deleted_object: bool
object_key: str | None = None
def supports_blob_dedup(asset: FileAsset) -> bool:
return bool(asset.workspace_id)
def _object_key(asset: FileAsset) -> str:
return str(asset.asset.name or asset.asset)
def _server_storage(request=None) -> S3Storage:
return S3Storage(request=request, is_server=True)
def _get_object_metadata(storage: S3Storage, object_key: str) -> dict:
metadata = storage.get_object_metadata(object_name=object_key)
if not metadata:
raise UploadedObjectMissing(f"Storage object not found: {object_key}")
return metadata
def _calculate_object_sha256(storage: S3Storage, object_key: str) -> str:
try:
response = storage.s3_client.get_object(Bucket=storage.aws_storage_bucket_name, Key=object_key)
except ClientError as exc:
raise UploadedObjectMissing(f"Storage object not found: {object_key}") from exc
digest = hashlib.sha256()
body = response["Body"]
try:
for chunk in body.iter_chunks(chunk_size=1024 * 1024):
if chunk:
digest.update(chunk)
finally:
body.close()
return digest.hexdigest()
def _resolve_size(asset: FileAsset, metadata: dict) -> int:
metadata_size = metadata.get("ContentLength")
if metadata_size is not None:
return int(metadata_size)
attr_size = (asset.attributes or {}).get("size")
if attr_size is not None:
return int(float(attr_size))
return int(asset.size or 0)
def _resolve_mime_type(asset: FileAsset, metadata: dict) -> str:
return str((asset.attributes or {}).get("type") or metadata.get("ContentType") or "")
def finalize_uploaded_file_asset(asset: FileAsset, request=None) -> FileBlobFinalizeResult:
"""
Attach a confirmed upload to a canonical StoredBlob.
The uploaded object is treated as a temporary candidate until its SHA-256 is
known. If the same blob already exists in the workspace, the FileAsset is
repointed to the canonical object and the duplicate candidate object is
removed from storage.
"""
if not asset.workspace_id:
raise ValueError("FileAsset workspace is required for blob deduplication.")
storage = _server_storage(request=request)
candidate_key = _object_key(asset)
candidate_metadata = _get_object_metadata(storage, candidate_key)
candidate_sha256 = _calculate_object_sha256(storage, candidate_key)
candidate_size = _resolve_size(asset, candidate_metadata)
candidate_mime_type = _resolve_mime_type(asset, candidate_metadata)
duplicate_object_key = None
released_object_key = None
with transaction.atomic():
locked_asset = FileAsset.objects.select_for_update().get(pk=asset.pk)
previous_blob_id = locked_asset.blob_id
blob = (
StoredBlob.objects.select_for_update()
.filter(
workspace_id=locked_asset.workspace_id,
sha256=candidate_sha256,
size=candidate_size,
status=StoredBlob.Status.ACTIVE,
deleted_at__isnull=True,
)
.first()
)
if blob is None:
try:
with transaction.atomic():
blob = StoredBlob.objects.create(
workspace_id=locked_asset.workspace_id,
sha256=candidate_sha256,
size=candidate_size,
mime_type=candidate_mime_type,
canonical_object_key=candidate_key,
status=StoredBlob.Status.ACTIVE,
ref_count=0,
storage_metadata=candidate_metadata,
)
except IntegrityError:
blob = (
StoredBlob.objects.select_for_update()
.filter(
workspace_id=locked_asset.workspace_id,
sha256=candidate_sha256,
size=candidate_size,
status=StoredBlob.Status.ACTIVE,
deleted_at__isnull=True,
)
.get()
)
deduplicated = blob.canonical_object_key != candidate_key
if deduplicated:
duplicate_object_key = candidate_key
if previous_blob_id and previous_blob_id != blob.id:
previous_blob = StoredBlob.objects.select_for_update().filter(pk=previous_blob_id).first()
if previous_blob:
previous_blob.ref_count = max(previous_blob.ref_count - 1, 0)
if previous_blob.ref_count == 0 and previous_blob.status == StoredBlob.Status.ACTIVE:
previous_blob.status = StoredBlob.Status.ORPHANED
released_object_key = previous_blob.canonical_object_key
previous_blob.save(update_fields=["ref_count", "status", "updated_at"])
if previous_blob_id != blob.id:
blob.ref_count += 1
blob.save(update_fields=["ref_count", "updated_at"])
attributes = dict(locked_asset.attributes or {})
attributes["size"] = candidate_size
attributes["type"] = candidate_mime_type
attributes["sha256"] = candidate_sha256
attributes["blob_id"] = str(blob.id)
attributes["deduplicated"] = deduplicated
locked_asset.blob = blob
locked_asset.asset = blob.canonical_object_key
locked_asset.size = candidate_size
locked_asset.attributes = attributes
locked_asset.storage_metadata = blob.storage_metadata or candidate_metadata
locked_asset.is_uploaded = True
locked_asset.save(
update_fields=[
"blob",
"asset",
"size",
"attributes",
"storage_metadata",
"is_uploaded",
"updated_at",
]
)
deleted_duplicate_object = False
object_keys_to_delete = [key for key in {duplicate_object_key, released_object_key} if key]
if object_keys_to_delete:
deleted_duplicate_object = storage.delete_files(object_names=object_keys_to_delete)
locked_asset.refresh_from_db()
asset.blob_id = locked_asset.blob_id
asset.asset = locked_asset.asset
asset.size = locked_asset.size
asset.attributes = locked_asset.attributes
asset.storage_metadata = locked_asset.storage_metadata
asset.is_uploaded = locked_asset.is_uploaded
return FileBlobFinalizeResult(
asset=locked_asset,
blob=blob,
sha256=candidate_sha256,
deduplicated=deduplicated,
deleted_duplicate_object=deleted_duplicate_object,
)
def confirm_uploaded_file_asset(asset: FileAsset, request=None, attributes: dict | None = None) -> FileAsset:
if attributes is not None:
asset.attributes = attributes
asset.save(update_fields=["attributes", "updated_at"])
if supports_blob_dedup(asset):
return finalize_uploaded_file_asset(asset, request=request).asset
storage = _server_storage(request=request)
asset.storage_metadata = storage.get_object_metadata(object_name=_object_key(asset)) or asset.storage_metadata
asset.is_uploaded = True
asset.save(update_fields=["storage_metadata", "is_uploaded", "updated_at"])
return asset
def release_file_asset_blob(
asset: FileAsset,
request=None,
delete_untracked_object: bool = False,
) -> FileBlobReleaseResult:
"""
Release the blob reference held by a FileAsset.
The canonical object is deleted only when the last blob-backed FileAsset
releases it. Legacy non-blob assets are left intact unless the caller marks
them as temporary/untracked.
"""
storage = _server_storage(request=request)
delete_object_key = None
had_blob = False
with transaction.atomic():
locked_asset = FileAsset.all_objects.select_for_update().get(pk=asset.pk)
current_key = _object_key(locked_asset)
if locked_asset.blob_id:
had_blob = True
blob = StoredBlob.objects.select_for_update().get(pk=locked_asset.blob_id)
blob.ref_count = max(blob.ref_count - 1, 0)
if blob.ref_count == 0 and blob.status == StoredBlob.Status.ACTIVE:
blob.status = StoredBlob.Status.ORPHANED
delete_object_key = blob.canonical_object_key
blob.save(update_fields=["ref_count", "status", "updated_at"])
locked_asset.blob = None
locked_asset.save(update_fields=["blob", "updated_at"])
elif delete_untracked_object and current_key:
delete_object_key = current_key
if had_blob:
asset.blob_id = None
deleted_object = False
if delete_object_key:
try:
deleted_object = storage.delete_files(object_names=[delete_object_key])
except Exception as exc:
log_exception(exc)
deleted_object = False
return FileBlobReleaseResult(had_blob=had_blob, deleted_object=deleted_object, object_key=delete_object_key)
def attach_existing_blob_to_file_asset(source: FileAsset, target: FileAsset) -> bool:
if not source.blob_id:
source.refresh_from_db(fields=["blob"])
if not source.blob_id:
return False
with transaction.atomic():
source_blob = StoredBlob.objects.select_for_update().get(pk=source.blob_id)
locked_target = FileAsset.objects.select_for_update().get(pk=target.pk)
if locked_target.blob_id == source_blob.id:
return True
source_blob.ref_count += 1
source_blob.save(update_fields=["ref_count", "updated_at"])
attributes = dict(locked_target.attributes or {})
attributes["sha256"] = source_blob.sha256
attributes["blob_id"] = str(source_blob.id)
attributes["deduplicated"] = True
locked_target.blob = source_blob
locked_target.asset = source_blob.canonical_object_key
locked_target.size = source_blob.size
locked_target.attributes = attributes
locked_target.storage_metadata = source_blob.storage_metadata
locked_target.is_uploaded = True
locked_target.save(
update_fields=[
"blob",
"asset",
"size",
"attributes",
"storage_metadata",
"is_uploaded",
"updated_at",
]
)
target.blob_id = source.blob_id
target.asset = source_blob.canonical_object_key
return True