|
import base64 |
|
import os.path |
|
import traceback |
|
import uuid |
|
from pathlib import Path |
|
from typing import Optional |
|
|
|
import aioboto3 |
|
import aiofiles |
|
|
|
from metagpt.config import CONFIG |
|
from metagpt.const import BASE64_FORMAT |
|
from metagpt.logs import logger |
|
|
|
|
|
class S3: |
|
"""A class for interacting with Amazon S3 storage.""" |
|
|
|
def __init__(self): |
|
self.session = aioboto3.Session() |
|
self.s3_config = CONFIG.S3 |
|
self.auth_config = { |
|
"service_name": "s3", |
|
"aws_access_key_id": self.s3_config["access_key"], |
|
"aws_secret_access_key": self.s3_config["secret_key"], |
|
"endpoint_url": self.s3_config["endpoint_url"], |
|
} |
|
|
|
async def upload_file( |
|
self, |
|
bucket: str, |
|
local_path: str, |
|
object_name: str, |
|
) -> None: |
|
"""Upload a file from the local path to the specified path of the storage bucket specified in s3. |
|
|
|
Args: |
|
bucket: The name of the S3 storage bucket. |
|
local_path: The local file path, including the file name. |
|
object_name: The complete path of the uploaded file to be stored in S3, including the file name. |
|
|
|
Raises: |
|
Exception: If an error occurs during the upload process, an exception is raised. |
|
""" |
|
try: |
|
async with self.session.client(**self.auth_config) as client: |
|
async with aiofiles.open(local_path, mode="rb") as reader: |
|
body = await reader.read() |
|
await client.put_object(Body=body, Bucket=bucket, Key=object_name) |
|
logger.info(f"Successfully uploaded the file to path {object_name} in bucket {bucket} of s3.") |
|
except Exception as e: |
|
logger.error(f"Failed to upload the file to path {object_name} in bucket {bucket} of s3: {e}") |
|
raise e |
|
|
|
async def get_object_url( |
|
self, |
|
bucket: str, |
|
object_name: str, |
|
) -> str: |
|
"""Get the URL for a downloadable or preview file stored in the specified S3 bucket. |
|
|
|
Args: |
|
bucket: The name of the S3 storage bucket. |
|
object_name: The complete path of the file stored in S3, including the file name. |
|
|
|
Returns: |
|
The URL for the downloadable or preview file. |
|
|
|
Raises: |
|
Exception: If an error occurs while retrieving the URL, an exception is raised. |
|
""" |
|
try: |
|
async with self.session.client(**self.auth_config) as client: |
|
file = await client.get_object(Bucket=bucket, Key=object_name) |
|
return str(file["Body"].url) |
|
except Exception as e: |
|
logger.error(f"Failed to get the url for a downloadable or preview file: {e}") |
|
raise e |
|
|
|
async def get_object( |
|
self, |
|
bucket: str, |
|
object_name: str, |
|
) -> bytes: |
|
"""Get the binary data of a file stored in the specified S3 bucket. |
|
|
|
Args: |
|
bucket: The name of the S3 storage bucket. |
|
object_name: The complete path of the file stored in S3, including the file name. |
|
|
|
Returns: |
|
The binary data of the requested file. |
|
|
|
Raises: |
|
Exception: If an error occurs while retrieving the file data, an exception is raised. |
|
""" |
|
try: |
|
async with self.session.client(**self.auth_config) as client: |
|
s3_object = await client.get_object(Bucket=bucket, Key=object_name) |
|
return await s3_object["Body"].read() |
|
except Exception as e: |
|
logger.error(f"Failed to get the binary data of the file: {e}") |
|
raise e |
|
|
|
async def download_file( |
|
self, bucket: str, object_name: str, local_path: str, chunk_size: Optional[int] = 128 * 1024 |
|
) -> None: |
|
"""Download an S3 object to a local file. |
|
|
|
Args: |
|
bucket: The name of the S3 storage bucket. |
|
object_name: The complete path of the file stored in S3, including the file name. |
|
local_path: The local file path where the S3 object will be downloaded. |
|
chunk_size: The size of data chunks to read and write at a time. Default is 128 KB. |
|
|
|
Raises: |
|
Exception: If an error occurs during the download process, an exception is raised. |
|
""" |
|
try: |
|
async with self.session.client(**self.auth_config) as client: |
|
s3_object = await client.get_object(Bucket=bucket, Key=object_name) |
|
stream = s3_object["Body"] |
|
async with aiofiles.open(local_path, mode="wb") as writer: |
|
while True: |
|
file_data = await stream.read(chunk_size) |
|
if not file_data: |
|
break |
|
await writer.write(file_data) |
|
except Exception as e: |
|
logger.error(f"Failed to download the file from S3: {e}") |
|
raise e |
|
|
|
async def cache(self, data: str, file_ext: str, format: str = "") -> str: |
|
"""Save data to remote S3 and return url""" |
|
object_name = str(uuid.uuid4()).replace("-", "") + file_ext |
|
path = Path(__file__).parent |
|
pathname = path / object_name |
|
try: |
|
async with aiofiles.open(str(pathname), mode="wb") as file: |
|
if format == BASE64_FORMAT: |
|
data = base64.b64decode(data) |
|
await file.write(data) |
|
|
|
bucket = CONFIG.S3.get("bucket") |
|
object_pathname = CONFIG.S3.get("path") or "system" |
|
object_pathname += f"/{object_name}" |
|
object_pathname = os.path.normpath(object_pathname) |
|
await self.upload_file(bucket=bucket, local_path=str(pathname), object_name=object_pathname) |
|
pathname.unlink(missing_ok=True) |
|
|
|
return await self.get_object_url(bucket=bucket, object_name=object_pathname) |
|
except Exception as e: |
|
logger.exception(f"{e}, stack:{traceback.format_exc()}") |
|
pathname.unlink(missing_ok=True) |
|
return None |
|
|