api_for_unity / metrics.py
ldhldh's picture
Upload 8 files
90e26fa
raw
history blame
4.44 kB
from collections import Counter, defaultdict
from typing import List
import numpy as np
def get_servers_metrics(model_reports) -> List[str]:
servers_num_total = 0
servers_num_relay = 0
num_peers = 0
pings = []
num_ping_infs = 0
version_counts = Counter()
result = ["# SERVER LEVEL METRICS"]
for model_reports in model_reports:
for server in model_reports["server_rows"]:
if server["span"].server_info is not None:
next_pings = server["span"].server_info.next_pings
if next_pings is not None:
servers_num_total += 1
num_peers += len(next_pings)
pings_not_inf = [v for k, v in next_pings.items() if v != float("inf")]
pings.extend(pings_not_inf)
num_ping_infs += len([v for v in next_pings.values() if v == float("inf")])
if server["span"].server_info.using_relay:
servers_num_relay += 1
version = server["span"].server_info.version
if version:
version_counts[version] += 1
if servers_num_total > 0 and pings:
peers_per_srv = (len(pings) + num_ping_infs) / servers_num_total
pings_inf_share = num_ping_infs / (num_ping_infs + len(pings))
result.extend(
[
f"peers_per_srv {peers_per_srv:.1f}",
f"pings_inf_share {pings_inf_share:.3f}",
]
)
result.append(f"servers_num_total {servers_num_total}")
result.append(f"servers_num_relay {servers_num_relay}")
if pings:
result.append("# PINGS")
pings = np.sort(pings).tolist()
for pct in (25, 50, 75, 90, 95):
result.append(f'ping_pct{{pct="{pct}"}} {np.percentile(pings, pct):.4f}')
result.append("# VERSIONS")
for version_number, version_count in version_counts.items():
result.append(f'server_version{{version_number="{version_number}"}} {version_count}')
return result
def get_models_metrics(model_reports) -> List[str]:
result = [
"# MODEL LEVEL METRICS",
]
for model_reports in model_reports:
model_name = model_reports["dht_prefix"]
result.append(f"# MODEL: {model_name} {'-' * 50}")
blocks = defaultdict(lambda: np.zeros(model_reports["num_blocks"]))
for server in model_reports["server_rows"]:
for block_idx in range(server["span"].start, server["span"].end):
blocks["total"][block_idx] += 1
blocks[server["state"]][block_idx] += 1
if server["span"].server_info is not None:
for rps in ("network_rps", "inference_rps", "forward_rps"):
rps_value = getattr(server["span"].server_info, rps, 0)
if rps_value is not None:
blocks[rps][block_idx] += rps_value
result.extend(
[
f'n_blocks{{model="{model_name}"}} {model_reports["num_blocks"]}',
f'servers_num{{model="{model_name}"}} {len(model_reports["server_rows"])}',
f'blocks_total{{model="{model_name}"}} {blocks["total"].sum()}',
f'blocks_online_min{{model="{model_name}"}} {blocks["online"].min()}',
]
)
for block_state in ("online", "joining", "offline", "unreachable"):
result.append(f'blocks{{model="{model_name}",state="{block_state}"}} {blocks[block_state].sum():.0f}')
for rps in ("network_rps", "inference_rps", "forward_rps"):
rps_type = rps.split("_")[0]
result.append(f'rps_avg{{model="{model_name}",rps="{rps_type}"}} {blocks[rps].mean():.1f}')
result.append(f'rps_min{{model="{model_name}",rps="{rps_type}"}} {blocks[rps].min():.1f}')
return result
def get_prometheus_metrics(state_dict) -> str:
"""prepares metrics in Prometeus format
description: https://prometheus.io/docs/instrumenting/exposition_formats/
returns multline string with single metric per line
"""
result = []
result.append("# GENERAL METRICS")
result.append(f"update_duration {state_dict.get('update_duration', None):.1f}")
result.extend(get_servers_metrics(state_dict["model_reports"]))
result.extend(get_models_metrics(state_dict["model_reports"]))
return "\n".join(result)