File size: 4,338 Bytes
19dc0f3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
This module implements a hyperparameter optimization routine for the embedding application. It utilizes TPE optimization from Optuna.

Each run, the optimizer will set the default values inside the hyperparameters. At the end, it will output the best ones it has found.
"""
import hashlib
import json
import logging
import re

import gradio as gr
import numpy as np
import optuna

logging.getLogger('optuna').setLevel(logging.WARNING)

from pathlib import Path

import extensions.superboogav2.parameters as parameters
from modules.logging_colors import logger

from .benchmark import benchmark
from .parameters import Parameters


# Format the parameters into markdown format.
def _markdown_hyperparams():
    res = []
    for param_name, param_value in Parameters.getInstance().hyperparameters.items():
        # Escape any markdown syntax
        param_name = re.sub(r"([_*\[\]()~`>#+-.!])", r"\\\1", param_name)
        param_value_default = re.sub(r"([_*\[\]()~`>#+-.!])", r"\\\1", str(param_value['default'])) if param_value['default'] else ' '

        res.append('* {}: **{}**'.format(param_name, param_value_default))

    return '\n'.join(res)


# Convert numpy types to python types.
def _convert_np_types(params):
    for key in params:
        if type(params[key]) == np.bool_:
            params[key] = bool(params[key])
        elif type(params[key]) == np.int64:
            params[key] = int(params[key])
        elif type(params[key]) == np.float64:
            params[key] = float(params[key])
    return params


# Set the default values for the hyperparameters.
def _set_hyperparameters(params):
    for param_name, param_value in params.items():
        if param_name in Parameters.getInstance().hyperparameters:
            Parameters.getInstance().hyperparameters[param_name]['default'] = param_value


# Check if the parameter is for optimization.
def _is_optimization_param(val):
    is_opt = val.get('should_optimize', False)  # Either does not exist or is false
    return is_opt


# Create a hashable representation of the parameters
def _get_params_hash(params):
    params_str = json.dumps(params, sort_keys=True)
    return hashlib.sha256(params_str.encode()).hexdigest()


def optimize(collector, progress=gr.Progress()):
    # Inform the user that something is happening.
    progress(0, desc='Setting Up...')

    # Track the current step
    current_step = 0

    # Track the best score
    best_score = 0

    # Dictionary for caching scores
    scores_cache = {}

    def objective_function(trial):
        nonlocal current_step
        nonlocal best_score
        nonlocal scores_cache

        params = {}
        for key, val in Parameters.getInstance().hyperparameters.items():
            if _is_optimization_param(val):
                params[key] = trial.suggest_categorical(key, val['categories'])

        _set_hyperparameters(params)

        params_hash = _get_params_hash(params)

        # If the score for these parameters is in the cache, return it
        if params_hash in scores_cache:
            return scores_cache[params_hash]

        # Benchmark the current set of parameters.
        score, max_score = benchmark(Path("extensions/superboogav2/benchmark_texts/questions.json"), collector)

        # Cache the score
        scores_cache[params_hash] = score

        result = json.dumps(_convert_np_types(params), indent=4)
        result += f'\nScore: {score}/{max_score}'

        logger.debug(result)

        # Increment the current step
        current_step += 1

        # Update the best score
        best_score = max(best_score, score)

        # Update the progress
        progress(current_step / parameters.get_optimization_steps(), desc=f'Optimizing... {current_step}/{parameters.get_optimization_steps()}')

        return -score

    # Run the optimization.
    study = optuna.create_study()
    study.optimize(objective_function, n_trials=int(parameters.get_optimization_steps()))

    best_params = study.best_params
    _set_hyperparameters(best_params)

    # Convert results to a markdown string.
    str_result = f"## Best parameters:\n\n{_markdown_hyperparams()}\n\n## Score:\n\n{best_score}"

    # Save to JSON file
    with open('best_params.json', 'w') as fp:
        json.dump(_convert_np_types(best_params), fp, indent=4)

    return str_result