geored's picture
Upload folder using huggingface_hub
add8f0b verified
raw
history blame
2.68 kB
"""Various base classes."""
from types import coroutine
from collections.abc import Coroutine
from asyncio import get_running_loop
class AsyncBase:
def __init__(self, file, loop, executor):
self._file = file
self._executor = executor
self._ref_loop = loop
@property
def _loop(self):
return self._ref_loop or get_running_loop()
def __aiter__(self):
"""We are our own iterator."""
return self
def __repr__(self):
return super().__repr__() + " wrapping " + repr(self._file)
async def __anext__(self):
"""Simulate normal file iteration."""
line = await self.readline()
if line:
return line
else:
raise StopAsyncIteration
class AsyncIndirectBase(AsyncBase):
def __init__(self, name, loop, executor, indirect):
self._indirect = indirect
self._name = name
super().__init__(None, loop, executor)
@property
def _file(self):
return self._indirect()
@_file.setter
def _file(self, v):
pass # discard writes
class _ContextManager(Coroutine):
__slots__ = ("_coro", "_obj")
def __init__(self, coro):
self._coro = coro
self._obj = None
def send(self, value):
return self._coro.send(value)
def throw(self, typ, val=None, tb=None):
if val is None:
return self._coro.throw(typ)
elif tb is None:
return self._coro.throw(typ, val)
else:
return self._coro.throw(typ, val, tb)
def close(self):
return self._coro.close()
@property
def gi_frame(self):
return self._coro.gi_frame
@property
def gi_running(self):
return self._coro.gi_running
@property
def gi_code(self):
return self._coro.gi_code
def __next__(self):
return self.send(None)
@coroutine
def __iter__(self):
resp = yield from self._coro
return resp
def __await__(self):
resp = yield from self._coro
return resp
async def __anext__(self):
resp = await self._coro
return resp
async def __aenter__(self):
self._obj = await self._coro
return self._obj
async def __aexit__(self, exc_type, exc, tb):
self._obj.close()
self._obj = None
class AiofilesContextManager(_ContextManager):
"""An adjusted async context manager for aiofiles."""
async def __aexit__(self, exc_type, exc_val, exc_tb):
await get_running_loop().run_in_executor(
None, self._obj._file.__exit__, exc_type, exc_val, exc_tb
)
self._obj = None