Spaces:
Sleeping
Sleeping
from collections import Counter, defaultdict | |
from typing import List | |
import numpy as np | |
def get_servers_metrics(model_reports) -> List[str]: | |
servers_num_total = 0 | |
servers_num_relay = 0 | |
num_peers = 0 | |
pings = [] | |
num_ping_infs = 0 | |
version_counts = Counter() | |
result = ["# SERVER LEVEL METRICS"] | |
for model_reports in model_reports: | |
for server in model_reports["server_rows"]: | |
if server["span"].server_info is not None: | |
next_pings = server["span"].server_info.next_pings | |
if next_pings is not None: | |
servers_num_total += 1 | |
num_peers += len(next_pings) | |
pings_not_inf = [v for k, v in next_pings.items() if v != float("inf")] | |
pings.extend(pings_not_inf) | |
num_ping_infs += len([v for v in next_pings.values() if v == float("inf")]) | |
if server["span"].server_info.using_relay: | |
servers_num_relay += 1 | |
version = server["span"].server_info.version | |
if version: | |
version_counts[version] += 1 | |
if servers_num_total > 0 and pings: | |
peers_per_srv = (len(pings) + num_ping_infs) / servers_num_total | |
pings_inf_share = num_ping_infs / (num_ping_infs + len(pings)) | |
result.extend( | |
[ | |
f"peers_per_srv {peers_per_srv:.1f}", | |
f"pings_inf_share {pings_inf_share:.3f}", | |
] | |
) | |
result.append(f"servers_num_total {servers_num_total}") | |
result.append(f"servers_num_relay {servers_num_relay}") | |
if pings: | |
result.append("# PINGS") | |
pings = np.sort(pings).tolist() | |
for pct in (25, 50, 75, 90, 95): | |
result.append(f'ping_pct{{pct="{pct}"}} {np.percentile(pings, pct):.4f}') | |
result.append("# VERSIONS") | |
for version_number, version_count in version_counts.items(): | |
result.append(f'server_version{{version_number="{version_number}"}} {version_count}') | |
return result | |
def get_models_metrics(model_reports) -> List[str]: | |
result = [ | |
"# MODEL LEVEL METRICS", | |
] | |
for model_reports in model_reports: | |
model_name = model_reports["dht_prefix"] | |
result.append(f"# MODEL: {model_name} {'-' * 50}") | |
blocks = defaultdict(lambda: np.zeros(model_reports["num_blocks"])) | |
for server in model_reports["server_rows"]: | |
for block_idx in range(server["span"].start, server["span"].end): | |
blocks["total"][block_idx] += 1 | |
blocks[server["state"]][block_idx] += 1 | |
if server["span"].server_info is not None: | |
for rps in ("network_rps", "inference_rps", "forward_rps"): | |
rps_value = getattr(server["span"].server_info, rps, 0) | |
if rps_value is not None: | |
blocks[rps][block_idx] += rps_value | |
result.extend( | |
[ | |
f'n_blocks{{model="{model_name}"}} {model_reports["num_blocks"]}', | |
f'servers_num{{model="{model_name}"}} {len(model_reports["server_rows"])}', | |
f'blocks_total{{model="{model_name}"}} {blocks["total"].sum()}', | |
f'blocks_online_min{{model="{model_name}"}} {blocks["online"].min()}', | |
] | |
) | |
for block_state in ("online", "joining", "offline", "unreachable"): | |
result.append(f'blocks{{model="{model_name}",state="{block_state}"}} {blocks[block_state].sum():.0f}') | |
for rps in ("network_rps", "inference_rps", "forward_rps"): | |
rps_type = rps.split("_")[0] | |
result.append(f'rps_avg{{model="{model_name}",rps="{rps_type}"}} {blocks[rps].mean():.1f}') | |
result.append(f'rps_min{{model="{model_name}",rps="{rps_type}"}} {blocks[rps].min():.1f}') | |
return result | |
def get_prometheus_metrics(state_dict) -> str: | |
"""prepares metrics in Prometeus format | |
description: https://prometheus.io/docs/instrumenting/exposition_formats/ | |
returns multline string with single metric per line | |
""" | |
result = [] | |
result.append("# GENERAL METRICS") | |
result.append(f"update_duration {state_dict.get('update_duration', None):.1f}") | |
result.extend(get_servers_metrics(state_dict["model_reports"])) | |
result.extend(get_models_metrics(state_dict["model_reports"])) | |
return "\n".join(result) | |