Spaces:
Runtime error
Runtime error
import os | |
import random | |
import subprocess | |
import pandas as pd | |
from datetime import datetime | |
from huggingface_hub import HfApi, Repository | |
from utils import * | |
DATASET_REPO_URL = "https://huggingface.co/datasets/huggingface-projects/bot-fight-data" | |
DATASET_TEMP_REPO_URL = "https://huggingface.co/datasets/huggingface-projects/temp-match-results" | |
ELO_FILENAME = "soccer_elo.csv" | |
HISTORY_FILENAME = "soccer_history.csv" | |
TEMP_FILENAME = "results.csv" | |
ELO_DIR = "soccer_elo" | |
TEMP_DIR = "temp" | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
repo = Repository( | |
local_dir=ELO_DIR, clone_from=DATASET_REPO_URL, use_auth_token=HF_TOKEN | |
) | |
repo_temp = Repository( | |
local_dir=TEMP_DIR, clone_from=DATASET_TEMP_REPO_URL, use_auth_token=HF_TOKEN | |
) | |
api = HfApi() | |
os.chmod('./SoccerTows.x86_64', 0o755) | |
class Model: | |
""" | |
Class containing the info of a model. | |
:param name: Name of the model | |
:param elo: Elo rating of the model | |
:param games_played: Number of games played by the model (useful if we implement sigma uncertainty) | |
""" | |
def __init__(self, author, name, elo=1200, games_played=0): | |
self.author = author | |
self.name = name | |
self.elo = elo | |
self.games_played = games_played | |
class Matchmaking: | |
""" | |
Class managing the matchmaking between the models. | |
:param models: List of models | |
:param queue: Temporary list of models used for the matching process | |
:param k: Dev coefficient | |
:param max_diff: Maximum difference considered between two models' elo | |
:param matches: Dictionary containing the match history (to later upload as CSV) | |
""" | |
def __init__(self, models): | |
self.models = models | |
self.queue = self.models.copy() | |
self.k = 20 | |
self.max_diff = 500 | |
self.matches = { | |
"model1": [], | |
"model2": [], | |
"timestamp": [], | |
"result": [], | |
} | |
def run(self): | |
""" | |
Run the matchmaking process. | |
Add models to the queue, shuffle it, and match the models one by one to models with close ratings. | |
Compute the new elo for each model after each match and add the match to the match history. | |
""" | |
self.queue = self.models.copy() | |
random.shuffle(self.queue) | |
while len(self.queue) > 1: | |
print(f"Queue length: {len(self.queue)}") | |
model1 = self.queue.pop(0) | |
model2 = self.queue.pop(self.find_n_closest_indexes(model1, 10)) | |
match(model1, model2) | |
self.load_results() | |
def load_results(self): | |
""" Load the match history from the hub. """ | |
repo.git_pull() | |
results = pd.read_csv( | |
"https://huggingface.co/datasets/huggingface-projects/temp-match-results/raw/main/results.csv" | |
) | |
# while len(results) < len(self.matches["model1"]): | |
# time.sleep(60) | |
# results = pd.read_csv( | |
# "https://huggingface.co/datasets/huggingface-projects/temp-match-results/raw/main/results.csv" | |
# ) | |
for i, row in results.iterrows(): | |
model1 = row["model1"].split("/") | |
model2 = row["model2"].split("/") | |
model1 = self.find_model(model1[0], model1[1]) | |
model2 = self.find_model(model2[0], model2[1]) | |
result = row["result"] | |
if model1 is not None or model2 is not None: | |
self.compute_elo(model1, model2, row["result"]) | |
self.matches["model1"].append(model1.author + "/" + model1.name) | |
self.matches["model2"].append(model2.author + "/" + model2.name) | |
self.matches["result"].append(result) | |
self.matches["timestamp"].append(row["timestamp"]) | |
data_dict = {"model1": [], "model2": [], "timestamp": [], "result": []} | |
df = pd.DataFrame(data_dict) | |
print(df.head()) | |
repo_temp.git_pull() | |
df.to_csv(os.path.join(TEMP_DIR, TEMP_FILENAME), index=False) | |
repo_temp.push_to_hub(commit_message="Reset results.csv") | |
def find_model(self, author, name): | |
""" Find a model in the models list. """ | |
for model in self.models: | |
if model.author == author and model.name == name: | |
return model | |
return None | |
def compute_elo(self, model1, model2, result): | |
""" Compute the new elo for each model based on a match result. """ | |
delta = model1.elo - model2.elo | |
win_probability = 1 / (1 + 10 ** (-delta / 500)) | |
model1.elo += self.k * (result - win_probability) | |
model2.elo -= self.k * (result - win_probability) | |
def find_n_closest_indexes(self, model, n) -> int: | |
""" | |
Get a model index with a fairly close rating. If no model is found, return the last model in the queue. | |
We don't always pick the closest rating to add variety to the matchups. | |
:param model: Model to compare | |
:param n: Number of close models from which to pick a candidate | |
:return: id of the chosen candidate | |
""" | |
if len(self.queue) == 1: | |
return 0 | |
indexes = [] | |
closest_diffs = [9999999] * n | |
for i, m in enumerate(self.queue): | |
if m.name == model.name: | |
continue | |
diff = abs(m.elo - model.elo) | |
if diff < max(closest_diffs): | |
closest_diffs.append(diff) | |
closest_diffs.sort() | |
closest_diffs.pop() | |
indexes.append(i) | |
random.shuffle(indexes) | |
return indexes[0] | |
def to_csv(self): | |
""" Save the match history as a CSV file to the hub. """ | |
data_dict = {"rank": [], "author": [], "model": [], "elo": [], "games_played": []} | |
sorted_models = sorted(self.models, key=lambda x: x.elo, reverse=True) | |
for i, model in enumerate(sorted_models): | |
data_dict["rank"].append(i + 1) | |
data_dict["author"].append(model.author) | |
data_dict["model"].append(model.name) | |
data_dict["elo"].append(model.elo) | |
data_dict["games_played"].append(model.games_played) | |
df = pd.DataFrame(data_dict) | |
print(df.head()) | |
repo.git_pull() | |
history = pd.read_csv(os.path.join(ELO_DIR, HISTORY_FILENAME)) | |
new_history = pd.DataFrame(self.matches) | |
history = pd.concat([history, new_history]) | |
history.to_csv(os.path.join(ELO_DIR, HISTORY_FILENAME), index=False) | |
df.to_csv(os.path.join(ELO_DIR, ELO_FILENAME), index=False) | |
repo.push_to_hub(commit_message="Update ELO") | |
def match(model1, model2): | |
""" | |
Simulate a match between two models using the Unity environment. | |
:param model1: First Model object | |
:param model2: Second Model object | |
:return: match result (0: model1 lost, 0.5: draw, 1: model1 won) | |
""" | |
model1_id = model1.author + "/" + model1.name | |
model2_id = model2.author + "/" + model2.name | |
subprocess.run(["./SoccerTows.x86_64", "-model1", model1_id, "-model2", model2_id, "-nographics", "-batchmode"]) | |
print(f"Match {model1_id} against {model2_id} ended.") | |
model1.games_played += 1 | |
model2.games_played += 1 | |
def get_models_list() -> list: | |
""" | |
Get the list of models from the hub and the ELO file. | |
:return: list of Model objects | |
""" | |
models = [] | |
models_ids = [] | |
data = pd.read_csv(os.path.join(DATASET_REPO_URL, "resolve", "main", ELO_FILENAME)) | |
models_on_hub = api.list_models(filter=["reinforcement-learning", "ml-agents", "ML-Agents-SoccerTwos"]) | |
for i, row in data.iterrows(): | |
models.append(Model(row["author"], row["model"], row["elo"], row["games_played"])) | |
models_ids.append(row["author"] + "/" + row["model"]) | |
for model in models_on_hub: | |
author, name = model.modelId.split("/")[0], model.modelId.split("/")[1] | |
if model.modelId not in models_ids: | |
models.append(Model(author, name)) | |
print("New model found: ", author, "-", name) | |
return models | |
def get_elo_data() -> pd.DataFrame: | |
""" | |
Get the ELO data from the hub for all the models that have played at least one game. | |
:return: ELO data as a pandas DataFrame | |
""" | |
repo.git_pull() | |
data = pd.read_csv(os.path.join(DATASET_REPO_URL, "resolve", "main", ELO_FILENAME)) | |
return data | |
def init_matchmaking(): | |
""" | |
Run the matchmaking algorithm and save the results to the hub. | |
1. Get the list of models from the hub and the ELO data | |
2. Match models together based on their ELO rating | |
3. Simulate the matches using Unity to get the match result | |
4. Compute the new ELO rating for each model | |
5. Save the results to the hub | |
""" | |
models = get_models_list() | |
matchmaking = Matchmaking(models) | |
matchmaking.run() | |
matchmaking.to_csv() | |
print("Matchmaking done --", datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")) | |