import json import os import logging from datetime import datetime import src.envs as envs from src.backend.manage_requests import EvalRequest from src.backend.evaluate_model import Evaluator # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.getLogger("openai").setLevel(logging.WARNING) def run_evaluation(eval_request: EvalRequest, batch_size, device, local_dir: str, results_repo: str, no_cache=True, limit=None): """ Run the evaluation for a given model and upload the results. Args: eval_request (EvalRequest): The evaluation request object containing model details. num_fewshot (int): Number of few-shot examples. batch_size (int): Batch size for processing. device (str): The device to run the evaluation on. local_dir (str): Local directory path for saving results. results_repo (str): Repository ID where results will be uploaded. no_cache (bool): Whether to disable caching. limit (int, optional): Limit on the number of items to process. Use with caution. Returns: dict: A dictionary containing evaluation results. """ if limit: logging.warning("WARNING: --limit SHOULD ONLY BE USED FOR TESTING. REAL METRICS SHOULD NOT BE COMPUTED USING LIMIT.") try: evaluator = Evaluator(eval_request.model, eval_request.revision, eval_request.precision, batch_size, device, no_cache, limit, write_out=True, output_base_path='logs') results = evaluator.evaluate() except Exception as e: logging.error(f"Error during evaluation: {e}") raise dumped = json.dumps(results, indent=2) logging.info(dumped) output_path = os.path.join(local_dir, *eval_request.model.split("/"), f"results_{datetime.now()}.json") os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, "w") as f: f.write(dumped) envs.API.upload_file( path_or_fileobj=output_path, path_in_repo=f"{eval_request.model}/results_{datetime.now()}.json", repo_id=results_repo, repo_type="dataset", ) return results