NODEDC_TASKMANAGER/plane-src/apps/api/plane/utils/attachment_preview.py

60 lines
1.9 KiB
Python

# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
from urllib.parse import quote
from botocore.exceptions import ClientError
from django.http import HttpResponse, StreamingHttpResponse
from plane.settings.storage import S3Storage
def attachment_object_exists(asset):
storage = S3Storage(request=None)
try:
storage.s3_client.head_object(
Bucket=storage.aws_storage_bucket_name,
Key=str(asset.asset.name),
)
return True
except ClientError:
return False
def get_attachment_preview_response(request, asset, disposition="inline"):
storage = S3Storage(request=None)
range_header = request.META.get("HTTP_RANGE")
request_kwargs = {
"Bucket": storage.aws_storage_bucket_name,
"Key": str(asset.asset.name),
}
if range_header:
request_kwargs["Range"] = range_header
try:
storage_response = storage.s3_client.get_object(**request_kwargs)
except ClientError:
return HttpResponse("Attachment object not found.", status=404)
content_type = (
asset.attributes.get("type")
or storage_response.get("ContentType")
or "application/octet-stream"
)
filename = quote(asset.attributes.get("name") or "attachment")
response = StreamingHttpResponse(
storage_response["Body"].iter_chunks(chunk_size=8192),
status=206 if storage_response.get("ContentRange") else 200,
content_type=content_type,
)
response["Content-Disposition"] = f"{disposition}; filename*=UTF-8''{filename}"
response["Accept-Ranges"] = "bytes"
if storage_response.get("ContentLength") is not None:
response["Content-Length"] = storage_response["ContentLength"]
if storage_response.get("ContentRange"):
response["Content-Range"] = storage_response["ContentRange"]
return response