Spaces:
Running
on
T4
Running
on
T4
from logging import Logger | |
import time | |
class StreamingConnectionInfo: | |
def __init__(self, address, active_connections, latest_message_received_timestamp): | |
self.address = address | |
self.active_connections = active_connections | |
self.latest_message_received_timestamp = latest_message_received_timestamp | |
def __repr__(self): | |
return str(self) | |
def __str__(self): | |
return str( | |
{ | |
"address": self.address, | |
"active_connections": self.active_connections, | |
"latest_message_received_timestamp": self.latest_message_received_timestamp, | |
} | |
) | |
class ConnectionTracker: | |
def __init__(self, logger: Logger): | |
self.connections = dict() | |
self.logger = logger | |
def __str__(self): | |
return str(self.connections) | |
def add_connection(self, address): | |
if address not in self.connections: | |
self.connections[address] = StreamingConnectionInfo(address, 1, time.time()) | |
else: | |
self.connections[address].active_connections += 1 | |
self.connections[address].latest_message_received_timestamp = time.time() | |
def log_recent_message(self, address): | |
if address in self.connections: | |
self.connections[address].latest_message_received_timestamp = time.time() | |
else: | |
self.logger.warning( | |
f"Address {address} not found in connection tracker when attempting to log recent message" | |
) | |
def remove_connection(self, address): | |
if address in self.connections: | |
self.connections[address].active_connections -= 1 | |
if self.connections[address].active_connections < 0: | |
self.logger.warning( | |
f"Address {address} has negative active connections ({self.connections[address].active_connections})" | |
) | |
if self.connections[address].active_connections <= 0: | |
del self.connections[address] | |
else: | |
self.logger.warning( | |
f"Address {address} not found in connection tracker when attempting to remove it" | |
) | |
def get_active_connection_count(self): | |
return sum( | |
[connection.active_connections for connection in self.connections.values()] | |
) | |