ФУНКЦИИ - МЕЖПРОЕКТНАЯ КОММУНИКАЦИЯ: подтверждение файловых загрузок через внутренний storage
This commit is contained in:
parent
490aa1bc04
commit
3e328531ec
|
|
@ -22,7 +22,7 @@ class S3Storage(S3Boto3Storage):
|
|||
|
||||
"""S3 storage class to generate presigned URLs for S3 objects"""
|
||||
|
||||
def __init__(self, request=None, **kwargs):
|
||||
def __init__(self, request=None, is_server=False, use_internal_endpoint=False, **kwargs):
|
||||
# Get the AWS credentials and bucket name from the environment
|
||||
self.aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID")
|
||||
# Use the AWS_SECRET_ACCESS_KEY environment variable for the secret key
|
||||
|
|
@ -36,19 +36,26 @@ class S3Storage(S3Boto3Storage):
|
|||
# Use the SIGNED_URL_EXPIRATION environment variable for the expiration time (default: 3600 seconds)
|
||||
self.signed_url_expiration = int(os.environ.get("SIGNED_URL_EXPIRATION", "3600"))
|
||||
|
||||
self.use_internal_endpoint = bool(is_server or use_internal_endpoint)
|
||||
|
||||
if os.environ.get("USE_MINIO") == "1":
|
||||
# Determine protocol based on environment variable
|
||||
if os.environ.get("MINIO_ENDPOINT_SSL") == "1":
|
||||
endpoint_protocol = "https"
|
||||
else:
|
||||
endpoint_protocol = request.scheme if request else "http"
|
||||
endpoint_protocol = request.scheme if request and not self.use_internal_endpoint else "http"
|
||||
|
||||
endpoint_url = self.aws_s3_endpoint_url
|
||||
if request and not self.use_internal_endpoint:
|
||||
endpoint_url = f"{endpoint_protocol}://{request.get_host()}"
|
||||
|
||||
# Create an S3 client for MinIO
|
||||
self.s3_client = boto3.client(
|
||||
"s3",
|
||||
aws_access_key_id=self.aws_access_key_id,
|
||||
aws_secret_access_key=self.aws_secret_access_key,
|
||||
region_name=self.aws_region,
|
||||
endpoint_url=(f"{endpoint_protocol}://{request.get_host()}" if request else self.aws_s3_endpoint_url),
|
||||
endpoint_url=endpoint_url,
|
||||
config=boto3.session.Config(signature_version="s3v4"),
|
||||
)
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -204,3 +204,49 @@ class TestS3StorageSignedURLExpiration:
|
|||
mock_s3_client.generate_presigned_url.assert_called_once()
|
||||
call_kwargs = mock_s3_client.generate_presigned_url.call_args[1]
|
||||
assert call_kwargs["ExpiresIn"] == 120
|
||||
|
||||
@patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"AWS_ACCESS_KEY_ID": "test-key",
|
||||
"AWS_SECRET_ACCESS_KEY": "test-secret",
|
||||
"AWS_S3_BUCKET_NAME": "test-bucket",
|
||||
"AWS_REGION": "us-east-1",
|
||||
"AWS_S3_ENDPOINT_URL": "http://plane-minio:9000",
|
||||
"USE_MINIO": "1",
|
||||
},
|
||||
clear=True,
|
||||
)
|
||||
@patch("plane.settings.storage.boto3")
|
||||
def test_minio_request_uses_request_host_for_browser_urls(self, mock_boto3):
|
||||
"""Test that browser-facing MinIO URLs keep using the request host"""
|
||||
request = Mock(scheme="http")
|
||||
request.get_host.return_value = "localhost:8090"
|
||||
|
||||
S3Storage(request=request)
|
||||
|
||||
call_kwargs = mock_boto3.client.call_args[1]
|
||||
assert call_kwargs["endpoint_url"] == "http://localhost:8090"
|
||||
|
||||
@patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"AWS_ACCESS_KEY_ID": "test-key",
|
||||
"AWS_SECRET_ACCESS_KEY": "test-secret",
|
||||
"AWS_S3_BUCKET_NAME": "test-bucket",
|
||||
"AWS_REGION": "us-east-1",
|
||||
"AWS_S3_ENDPOINT_URL": "http://plane-minio:9000",
|
||||
"USE_MINIO": "1",
|
||||
},
|
||||
clear=True,
|
||||
)
|
||||
@patch("plane.settings.storage.boto3")
|
||||
def test_minio_server_mode_uses_internal_endpoint_with_request(self, mock_boto3):
|
||||
"""Test that server-side MinIO operations use the internal endpoint"""
|
||||
request = Mock(scheme="http")
|
||||
request.get_host.return_value = "localhost:8090"
|
||||
|
||||
S3Storage(request=request, is_server=True)
|
||||
|
||||
call_kwargs = mock_boto3.client.call_args[1]
|
||||
assert call_kwargs["endpoint_url"] == "http://plane-minio:9000"
|
||||
|
|
|
|||
|
|
@ -41,6 +41,10 @@ def _object_key(asset: FileAsset) -> str:
|
|||
return str(asset.asset.name or asset.asset)
|
||||
|
||||
|
||||
def _server_storage(request=None) -> S3Storage:
|
||||
return S3Storage(request=request, is_server=True)
|
||||
|
||||
|
||||
def _get_object_metadata(storage: S3Storage, object_key: str) -> dict:
|
||||
metadata = storage.get_object_metadata(object_name=object_key)
|
||||
if not metadata:
|
||||
|
|
@ -95,7 +99,7 @@ def finalize_uploaded_file_asset(asset: FileAsset, request=None) -> FileBlobFina
|
|||
if not asset.workspace_id:
|
||||
raise ValueError("FileAsset workspace is required for blob deduplication.")
|
||||
|
||||
storage = S3Storage(request=request)
|
||||
storage = _server_storage(request=request)
|
||||
candidate_key = _object_key(asset)
|
||||
candidate_metadata = _get_object_metadata(storage, candidate_key)
|
||||
candidate_sha256 = _calculate_object_sha256(storage, candidate_key)
|
||||
|
|
@ -217,7 +221,7 @@ def confirm_uploaded_file_asset(asset: FileAsset, request=None, attributes: dict
|
|||
if supports_blob_dedup(asset):
|
||||
return finalize_uploaded_file_asset(asset, request=request).asset
|
||||
|
||||
storage = S3Storage(request=request)
|
||||
storage = _server_storage(request=request)
|
||||
asset.storage_metadata = storage.get_object_metadata(object_name=_object_key(asset)) or asset.storage_metadata
|
||||
asset.is_uploaded = True
|
||||
asset.save(update_fields=["storage_metadata", "is_uploaded", "updated_at"])
|
||||
|
|
@ -237,12 +241,12 @@ def release_file_asset_blob(
|
|||
them as temporary/untracked.
|
||||
"""
|
||||
|
||||
storage = S3Storage(request=request)
|
||||
storage = _server_storage(request=request)
|
||||
delete_object_key = None
|
||||
had_blob = False
|
||||
|
||||
with transaction.atomic():
|
||||
locked_asset = FileAsset.objects.select_for_update().get(pk=asset.pk)
|
||||
locked_asset = FileAsset.all_objects.select_for_update().get(pk=asset.pk)
|
||||
current_key = _object_key(locked_asset)
|
||||
|
||||
if locked_asset.blob_id:
|
||||
|
|
|
|||
Loading…
Reference in New Issue