Spaces:
Build error
Build error
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
from unittest.mock import Mock | |
import azure.cosmos.errors as cosmos_errors | |
from azure.cosmos.cosmos_client import CosmosClient | |
import pytest | |
from botbuilder.core import StoreItem | |
from botbuilder.azure import CosmosDbStorage, CosmosDbConfig | |
from botbuilder.testing import StorageBaseTests | |
# local cosmosdb emulator instance cosmos_db_config | |
COSMOS_DB_CONFIG = CosmosDbConfig( | |
endpoint="https://localhost:8081", | |
masterkey="C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==", | |
database="test-db", | |
container="bot-storage", | |
) | |
EMULATOR_RUNNING = False | |
def get_storage(): | |
return CosmosDbStorage(COSMOS_DB_CONFIG) | |
async def reset(): | |
storage = CosmosDbStorage(COSMOS_DB_CONFIG) | |
try: | |
storage.client.DeleteDatabase(database_link="dbs/" + COSMOS_DB_CONFIG.database) | |
except cosmos_errors.HTTPFailure: | |
pass | |
def get_mock_client(identifier: str = "1"): | |
# pylint: disable=attribute-defined-outside-init, invalid-name | |
mock = MockClient() | |
mock.QueryDatabases = Mock(return_value=[]) | |
mock.QueryContainers = Mock(return_value=[]) | |
mock.CreateDatabase = Mock(return_value={"id": identifier}) | |
mock.CreateContainer = Mock(return_value={"id": identifier}) | |
return mock | |
class MockClient(CosmosClient): | |
def __init__(self): # pylint: disable=super-init-not-called | |
pass | |
class SimpleStoreItem(StoreItem): | |
def __init__(self, counter=1, e_tag="*"): | |
super(SimpleStoreItem, self).__init__() | |
self.counter = counter | |
self.e_tag = e_tag | |
class TestCosmosDbStorageConstructor: | |
async def test_cosmos_storage_init_should_error_without_cosmos_db_config(self): | |
try: | |
CosmosDbStorage(CosmosDbConfig()) | |
except Exception as error: | |
assert error | |
async def test_creation_request_options_are_being_called(self): | |
# pylint: disable=protected-access | |
test_config = CosmosDbConfig( | |
endpoint="https://localhost:8081", | |
masterkey="C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==", | |
database="test-db", | |
container="bot-storage", | |
database_creation_options={"OfferThroughput": 1000}, | |
container_creation_options={"OfferThroughput": 500}, | |
) | |
test_id = "1" | |
client = get_mock_client(identifier=test_id) | |
storage = CosmosDbStorage(test_config, client) | |
storage.database = test_id | |
assert storage._get_or_create_database(doc_client=client, id=test_id), test_id | |
client.CreateDatabase.assert_called_with( | |
{"id": test_id}, test_config.database_creation_options | |
) | |
assert storage._get_or_create_container( | |
doc_client=client, container=test_id | |
), test_id | |
client.CreateContainer.assert_called_with( | |
"dbs/" + test_id, {"id": test_id}, test_config.container_creation_options | |
) | |
class TestCosmosDbStorageBaseStorageTests: | |
async def test_return_empty_object_when_reading_unknown_key(self): | |
await reset() | |
test_ran = await StorageBaseTests.return_empty_object_when_reading_unknown_key( | |
get_storage() | |
) | |
assert test_ran | |
async def test_handle_null_keys_when_reading(self): | |
await reset() | |
test_ran = await StorageBaseTests.handle_null_keys_when_reading(get_storage()) | |
assert test_ran | |
async def test_handle_null_keys_when_writing(self): | |
await reset() | |
test_ran = await StorageBaseTests.handle_null_keys_when_writing(get_storage()) | |
assert test_ran | |
async def test_does_not_raise_when_writing_no_items(self): | |
await reset() | |
test_ran = await StorageBaseTests.does_not_raise_when_writing_no_items( | |
get_storage() | |
) | |
assert test_ran | |
async def test_create_object(self): | |
await reset() | |
test_ran = await StorageBaseTests.create_object(get_storage()) | |
assert test_ran | |
async def test_handle_crazy_keys(self): | |
await reset() | |
test_ran = await StorageBaseTests.handle_crazy_keys(get_storage()) | |
assert test_ran | |
async def test_update_object(self): | |
await reset() | |
test_ran = await StorageBaseTests.update_object(get_storage()) | |
assert test_ran | |
async def test_delete_object(self): | |
await reset() | |
test_ran = await StorageBaseTests.delete_object(get_storage()) | |
assert test_ran | |
async def test_perform_batch_operations(self): | |
await reset() | |
test_ran = await StorageBaseTests.perform_batch_operations(get_storage()) | |
assert test_ran | |
async def test_proceeds_through_waterfall(self): | |
await reset() | |
test_ran = await StorageBaseTests.proceeds_through_waterfall(get_storage()) | |
assert test_ran | |
class TestCosmosDbStorage: | |
async def test_cosmos_storage_init_should_work_with_just_endpoint_and_key(self): | |
storage = CosmosDbStorage( | |
CosmosDbConfig( | |
endpoint=COSMOS_DB_CONFIG.endpoint, masterkey=COSMOS_DB_CONFIG.masterkey | |
) | |
) | |
await storage.write({"user": SimpleStoreItem()}) | |
data = await storage.read(["user"]) | |
assert "user" in data | |
assert data["user"].counter == 1 | |
assert len(data.keys()) == 1 | |
async def test_cosmos_storage_read_update_should_return_new_etag(self): | |
await reset() | |
storage = CosmosDbStorage(COSMOS_DB_CONFIG) | |
await storage.write({"test": SimpleStoreItem(counter=1)}) | |
data_result = await storage.read(["test"]) | |
data_result["test"].counter = 2 | |
await storage.write(data_result) | |
data_updated = await storage.read(["test"]) | |
assert data_updated["test"].counter == 2 | |
assert data_updated["test"].e_tag != data_result["test"].e_tag | |
async def test_cosmos_storage_read_with_invalid_key_should_return_empty_dict(self): | |
await reset() | |
storage = CosmosDbStorage(COSMOS_DB_CONFIG) | |
data = await storage.read(["test"]) | |
assert isinstance(data, dict) | |
assert not data.keys() | |
async def test_cosmos_storage_write_should_overwrite_when_new_e_tag_is_an_asterisk( | |
self, | |
): | |
await reset() | |
storage = CosmosDbStorage(COSMOS_DB_CONFIG) | |
await storage.write({"user": SimpleStoreItem()}) | |
await storage.write({"user": SimpleStoreItem(counter=10, e_tag="*")}) | |
data = await storage.read(["user"]) | |
assert data["user"].counter == 10 | |
async def test_cosmos_storage_delete_should_delete_multiple_values_when_given_multiple_valid_keys( | |
self, | |
): | |
await reset() | |
storage = CosmosDbStorage(COSMOS_DB_CONFIG) | |
await storage.write({"test": SimpleStoreItem(), "test2": SimpleStoreItem(2)}) | |
await storage.delete(["test", "test2"]) | |
data = await storage.read(["test", "test2"]) | |
assert not data.keys() | |
async def test_cosmos_storage_delete_should_delete_values_when_given_multiple_valid_keys_and_ignore_other_data( | |
self, | |
): | |
await reset() | |
storage = CosmosDbStorage(COSMOS_DB_CONFIG) | |
await storage.write( | |
{ | |
"test": SimpleStoreItem(), | |
"test2": SimpleStoreItem(counter=2), | |
"test3": SimpleStoreItem(counter=3), | |
} | |
) | |
await storage.delete(["test", "test2"]) | |
data = await storage.read(["test", "test2", "test3"]) | |
assert len(data.keys()) == 1 | |
async def test_cosmos_storage_delete_invalid_key_should_do_nothing_and_not_affect_cached_data( | |
self, | |
): | |
await reset() | |
storage = CosmosDbStorage(COSMOS_DB_CONFIG) | |
await storage.write({"test": SimpleStoreItem()}) | |
await storage.delete(["foo"]) | |
data = await storage.read(["test"]) | |
assert len(data.keys()) == 1 | |
data = await storage.read(["foo"]) | |
assert not data.keys() | |
async def test_cosmos_storage_delete_invalid_keys_should_do_nothing_and_not_affect_cached_data( | |
self, | |
): | |
await reset() | |
storage = CosmosDbStorage(COSMOS_DB_CONFIG) | |
await storage.write({"test": SimpleStoreItem()}) | |
await storage.delete(["foo", "bar"]) | |
data = await storage.read(["test"]) | |
assert len(data.keys()) == 1 | |