Spaces:
Sleeping
Sleeping
import logging | |
import selectors | |
import sys | |
from functools import cache | |
from .distributed import global_leader_only | |
_logger = logging.getLogger(__name__) | |
def _get_stdin_selector(): | |
selector = selectors.DefaultSelector() | |
selector.register(fileobj=sys.stdin, events=selectors.EVENT_READ) | |
return selector | |
def non_blocking_input(): | |
s = "" | |
selector = _get_stdin_selector() | |
events = selector.select(timeout=0) | |
for key, _ in events: | |
s: str = key.fileobj.readline().strip() | |
_logger.info(f'Get stdin "{s}".') | |
return s | |