Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-azure
/botbuilder
/azure
/blob_storage.py
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
import json | |
from typing import Dict, List | |
from jsonpickle import encode | |
from jsonpickle.unpickler import Unpickler | |
from azure.core import MatchConditions | |
from azure.core.exceptions import ( | |
HttpResponseError, | |
ResourceExistsError, | |
ResourceNotFoundError, | |
) | |
from azure.storage.blob.aio import ( | |
BlobServiceClient, | |
BlobClient, | |
StorageStreamDownloader, | |
) | |
from botbuilder.core import Storage | |
class BlobStorageSettings: | |
"""The class for Azure Blob configuration for the Azure Bot Framework. | |
:param container_name: Name of the Blob container. | |
:type container_name: str | |
:param account_name: Name of the Blob Storage account. Required if not using connection_string. | |
:type account_name: str | |
:param account_key: Key of the Blob Storage account. Required if not using connection_string. | |
:type account_key: str | |
:param connection_string: Connection string of the Blob Storage account. | |
Required if not using account_name and account_key. | |
:type connection_string: str | |
""" | |
def __init__( | |
self, | |
container_name: str, | |
account_name: str = "", | |
account_key: str = "", | |
connection_string: str = "", | |
): | |
self.container_name = container_name | |
self.account_name = account_name | |
self.account_key = account_key | |
self.connection_string = connection_string | |
# New Azure Blob SDK only allows connection strings, but our SDK allows key+name. | |
# This is here for backwards compatibility. | |
def convert_account_name_and_key_to_connection_string(settings: BlobStorageSettings): | |
if not settings.account_name or not settings.account_key: | |
raise Exception( | |
"account_name and account_key are both required for BlobStorageSettings if not using a connections string." | |
) | |
return ( | |
f"DefaultEndpointsProtocol=https;AccountName={settings.account_name};" | |
f"AccountKey={settings.account_key};EndpointSuffix=core.windows.net" | |
) | |
class BlobStorage(Storage): | |
"""An Azure Blob based storage provider for a bot. | |
This class uses a single Azure Storage Blob Container. | |
Each entity or StoreItem is serialized into a JSON string and stored in an individual text blob. | |
Each blob is named after the store item key, which is encoded so that it conforms a valid blob name. | |
If an entity is an StoreItem, the storage object will set the entity's e_tag | |
property value to the blob's e_tag upon read. Afterward, an match_condition with the ETag value | |
will be generated during Write. New entities start with a null e_tag. | |
:param settings: Settings used to instantiate the Blob service. | |
:type settings: :class:`botbuilder.azure.BlobStorageSettings` | |
""" | |
def __init__(self, settings: BlobStorageSettings): | |
if not settings.container_name: | |
raise Exception("Container name is required.") | |
if settings.connection_string: | |
blob_service_client = BlobServiceClient.from_connection_string( | |
settings.connection_string | |
) | |
else: | |
blob_service_client = BlobServiceClient.from_connection_string( | |
convert_account_name_and_key_to_connection_string(settings) | |
) | |
self.__container_client = blob_service_client.get_container_client( | |
settings.container_name | |
) | |
self.__initialized = False | |
async def _initialize(self): | |
if self.__initialized is False: | |
# This should only happen once - assuming this is a singleton. | |
# ContainerClient.exists() method is available in an unreleased version of the SDK. Until then, we use: | |
try: | |
await self.__container_client.create_container() | |
except ResourceExistsError: | |
pass | |
self.__initialized = True | |
return self.__initialized | |
async def read(self, keys: List[str]) -> Dict[str, object]: | |
"""Retrieve entities from the configured blob container. | |
:param keys: An array of entity keys. | |
:type keys: Dict[str, object] | |
:return dict: | |
""" | |
if not keys: | |
raise Exception("Keys are required when reading") | |
await self._initialize() | |
items = {} | |
for key in keys: | |
blob_client = self.__container_client.get_blob_client(key) | |
try: | |
items[key] = await self._inner_read_blob(blob_client) | |
except HttpResponseError as err: | |
if err.status_code == 404: | |
continue | |
return items | |
async def write(self, changes: Dict[str, object]): | |
"""Stores a new entity in the configured blob container. | |
:param changes: The changes to write to storage. | |
:type changes: Dict[str, object] | |
:return: | |
""" | |
if changes is None: | |
raise Exception("Changes are required when writing") | |
if not changes: | |
return | |
await self._initialize() | |
for name, item in changes.items(): | |
blob_reference = self.__container_client.get_blob_client(name) | |
e_tag = None | |
if isinstance(item, dict): | |
e_tag = item.get("e_tag", None) | |
elif hasattr(item, "e_tag"): | |
e_tag = item.e_tag | |
e_tag = None if e_tag == "*" else e_tag | |
if e_tag == "": | |
raise Exception("blob_storage.write(): etag missing") | |
item_str = self._store_item_to_str(item) | |
if e_tag: | |
await blob_reference.upload_blob( | |
item_str, match_condition=MatchConditions.IfNotModified, etag=e_tag | |
) | |
else: | |
await blob_reference.upload_blob(item_str, overwrite=True) | |
async def delete(self, keys: List[str]): | |
"""Deletes entity blobs from the configured container. | |
:param keys: An array of entity keys. | |
:type keys: Dict[str, object] | |
""" | |
if keys is None: | |
raise Exception("BlobStorage.delete: keys parameter can't be null") | |
await self._initialize() | |
for key in keys: | |
blob_client = self.__container_client.get_blob_client(key) | |
try: | |
await blob_client.delete_blob() | |
# We can't delete what's already gone. | |
except ResourceNotFoundError: | |
pass | |
def _store_item_to_str(self, item: object) -> str: | |
return encode(item) | |
async def _inner_read_blob(self, blob_client: BlobClient): | |
blob = await blob_client.download_blob() | |
return await self._blob_to_store_item(blob) | |
async def _blob_to_store_item(blob: StorageStreamDownloader) -> object: | |
item = json.loads(await blob.content_as_text()) | |
item["e_tag"] = blob.properties.etag.replace('"', "") | |
result = Unpickler().restore(item) | |
return result | |