from frozenlist import FrozenList | |
__version__ = "1.3.1" | |
__all__ = ("Signal",) | |
class Signal(FrozenList): | |
"""Coroutine-based signal implementation. | |
To connect a callback to a signal, use any list method. | |
Signals are fired using the send() coroutine, which takes named | |
arguments. | |
""" | |
__slots__ = ("_owner",) | |
def __init__(self, owner): | |
super().__init__() | |
self._owner = owner | |
def __repr__(self): | |
return "<Signal owner={}, frozen={}, {!r}>".format( | |
self._owner, self.frozen, list(self) | |
) | |
async def send(self, *args, **kwargs): | |
""" | |
Sends data to all registered receivers. | |
""" | |
if not self.frozen: | |
raise RuntimeError("Cannot send non-frozen signal.") | |
for receiver in self: | |
await receiver(*args, **kwargs) # type: ignore | |