File size: 2,313 Bytes
0ad74ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
""" This module contains type hints for the anyio library. It was auto-generated so may include errors."""
from typing import Any, Callable, Coroutine, TypeVar, overload, Optional, Union
from types import TracebackType

T = TypeVar('T')
T_Retval = TypeVar('T_Retval')

class CapacityLimiter:
    def __init__(self, total_tokens: float): ...
    async def acquire(self) -> None: ...
    async def acquire_nowait(self) -> None: ...
    def release(self) -> None: ...
    @property
    def total_tokens(self) -> float: ...
    @property
    def available_tokens(self) -> float: ...
    def __enter__(self) -> 'CapacityLimiter': ...
    def __exit__(self, exc_type: Optional[type], exc_val: Optional[BaseException],
                 exc_tb: Optional[TracebackType]) -> None: ...

class to_thread:
    @staticmethod
    def run_sync(func: Callable[..., T], *args: Any, cancellable: bool = False,
                 limiter: Optional[CapacityLimiter] = None, **kwargs: Any) -> Coroutine[Any, Any, T]: ...

@overload
def run(func: Callable[[], T_Retval], *, backend: Optional[str] = ...,
        backend_options: Optional[dict] = ...) -> T_Retval: ...

@overload
def run(func: Callable[..., T_Retval], *args: Any, backend: Optional[str] = ...,
        backend_options: Optional[dict] = ..., **kwargs: Any) -> T_Retval: ...

def sleep(delay: Union[int, float]) -> Coroutine[Any, Any, None]: ...

async def sleep_forever() -> None: ...

def current_time() -> float: ...

def get_cancelled_exc_class() -> type[BaseException]: ...

def create_task_group() -> 'TaskGroup': ...

class TaskGroup:
    async def __aenter__(self) -> 'TaskGroup': ...
    async def __aexit__(self, exc_type: Optional[type], exc_val: Optional[BaseException],
                        exc_tb: Optional[TracebackType]) -> bool: ...
    def start_soon(self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any,
                   name: Optional[str] = None, **kwargs: Any) -> None: ...

def create_memory_object_stream(
    max_buffer_size: int = 0
) -> tuple['MemoryObjectSender', 'MemoryObjectReceiver']: ...

class MemoryObjectSender:
    async def send(self, item: Any) -> None: ...
    def send_nowait(self, item: Any) -> None: ...

class MemoryObjectReceiver:
    async def receive(self) -> Any: ...
    def receive_nowait(self) -> Any: ...