from __future__ import annotations | |
from abc import ABCMeta, abstractmethod | |
from types import TracebackType | |
from typing import TypeVar | |
T = TypeVar("T") | |
class AsyncResource(metaclass=ABCMeta): | |
""" | |
Abstract base class for all closeable asynchronous resources. | |
Works as an asynchronous context manager which returns the instance itself on enter, | |
and calls :meth:`aclose` on exit. | |
""" | |
async def __aenter__(self: T) -> T: | |
return self | |
async def __aexit__( | |
self, | |
exc_type: type[BaseException] | None, | |
exc_val: BaseException | None, | |
exc_tb: TracebackType | None, | |
) -> None: | |
await self.aclose() | |
async def aclose(self) -> None: | |
"""Close the resource.""" | |