File size: 4,185 Bytes
add8f0b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
from __future__ import annotations
import enum
from dataclasses import dataclass
from typing import Any, Generic, Literal, TypeVar, overload
from weakref import WeakKeyDictionary
from ._core._eventloop import get_async_backend
T = TypeVar("T")
D = TypeVar("D")
async def checkpoint() -> None:
"""
Check for cancellation and allow the scheduler to switch to another task.
Equivalent to (but more efficient than)::
await checkpoint_if_cancelled()
await cancel_shielded_checkpoint()
.. versionadded:: 3.0
"""
await get_async_backend().checkpoint()
async def checkpoint_if_cancelled() -> None:
"""
Enter a checkpoint if the enclosing cancel scope has been cancelled.
This does not allow the scheduler to switch to a different task.
.. versionadded:: 3.0
"""
await get_async_backend().checkpoint_if_cancelled()
async def cancel_shielded_checkpoint() -> None:
"""
Allow the scheduler to switch to another task but without checking for cancellation.
Equivalent to (but potentially more efficient than)::
with CancelScope(shield=True):
await checkpoint()
.. versionadded:: 3.0
"""
await get_async_backend().cancel_shielded_checkpoint()
def current_token() -> object:
"""
Return a backend specific token object that can be used to get back to the event
loop.
"""
return get_async_backend().current_token()
_run_vars: WeakKeyDictionary[Any, dict[str, Any]] = WeakKeyDictionary()
_token_wrappers: dict[Any, _TokenWrapper] = {}
@dataclass(frozen=True)
class _TokenWrapper:
__slots__ = "_token", "__weakref__"
_token: object
class _NoValueSet(enum.Enum):
NO_VALUE_SET = enum.auto()
class RunvarToken(Generic[T]):
__slots__ = "_var", "_value", "_redeemed"
def __init__(self, var: RunVar[T], value: T | Literal[_NoValueSet.NO_VALUE_SET]):
self._var = var
self._value: T | Literal[_NoValueSet.NO_VALUE_SET] = value
self._redeemed = False
class RunVar(Generic[T]):
"""
Like a :class:`~contextvars.ContextVar`, except scoped to the running event loop.
"""
__slots__ = "_name", "_default"
NO_VALUE_SET: Literal[_NoValueSet.NO_VALUE_SET] = _NoValueSet.NO_VALUE_SET
_token_wrappers: set[_TokenWrapper] = set()
def __init__(
self, name: str, default: T | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
):
self._name = name
self._default = default
@property
def _current_vars(self) -> dict[str, T]:
token = current_token()
try:
return _run_vars[token]
except KeyError:
run_vars = _run_vars[token] = {}
return run_vars
@overload
def get(self, default: D) -> T | D:
...
@overload
def get(self) -> T:
...
def get(
self, default: D | Literal[_NoValueSet.NO_VALUE_SET] = NO_VALUE_SET
) -> T | D:
try:
return self._current_vars[self._name]
except KeyError:
if default is not RunVar.NO_VALUE_SET:
return default
elif self._default is not RunVar.NO_VALUE_SET:
return self._default
raise LookupError(
f'Run variable "{self._name}" has no value and no default set'
)
def set(self, value: T) -> RunvarToken[T]:
current_vars = self._current_vars
token = RunvarToken(self, current_vars.get(self._name, RunVar.NO_VALUE_SET))
current_vars[self._name] = value
return token
def reset(self, token: RunvarToken[T]) -> None:
if token._var is not self:
raise ValueError("This token does not belong to this RunVar")
if token._redeemed:
raise ValueError("This token has already been used")
if token._value is _NoValueSet.NO_VALUE_SET:
try:
del self._current_vars[self._name]
except KeyError:
pass
else:
self._current_vars[self._name] = token._value
token._redeemed = True
def __repr__(self) -> str:
return f"<RunVar name={self._name!r}>"
|