File size: 4,448 Bytes
40e38d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import os
import json
import tempfile
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict
from datatrove.io import get_datafolder
from datatrove.utils.stats import MetricStatsDict
import gradio as gr
import tenacity

def find_folders(base_folder: str, path: str) -> List[str]:
    base_folder = get_datafolder(base_folder)
    if not base_folder.exists(path):
        return []
    return sorted(
        [
            folder["name"]
            for folder in base_folder.ls(path, detail=True)
            if folder["type"] == "directory" and not folder["name"].rstrip("/") == path
        ]
    )

def find_metrics_folders(base_folder: str) -> List[str]:
    base_data_df = get_datafolder(base_folder)
    dirs = sorted(
        folder
        for folder, info in base_data_df.find("", detail=True, maxdepth=1, withdirs=True).items()
            if info["type"] == "directory"
        )
    return sorted(list(set(dirs)))

def fetch_datasets(base_folder: str):
    datasets = sorted(find_metrics_folders(base_folder))
    return datasets, gr.update(choices=datasets, value=None), fetch_groups(base_folder, datasets, None, "union")

def fetch_groups(base_folder: str, datasets: List[str], old_groups: str, type: str = "intersection"):
    if not datasets:
        return gr.update(choices=[], value=None)

    with ThreadPoolExecutor() as executor:
        GROUPS = list(executor.map(lambda run: [Path(x).name for x in find_folders(base_folder, run)], datasets))
    if len(GROUPS) == 0:
        return gr.update(choices=[], value=None)

    if type == "intersection":
        new_choices = set.intersection(*(set(g) for g in GROUPS))
    else:
        new_choices = set.union(*(set(g) for g in GROUPS))
    value = None
    if old_groups:
        value = list(set.intersection(new_choices, {old_groups}))
        value = value[0] if value else None

    if not value and len(new_choices) == 1:
        value = list(new_choices)[0]

    return gr.update(choices=sorted(list(new_choices)), value=value)

def fetch_metrics(base_folder: str, datasets: List[str], group: str, old_metrics: str, type: str = "intersection"):
    if not group:
        return gr.update(choices=[], value=None)

    with ThreadPoolExecutor() as executor:
        metrics = list(
            executor.map(lambda run: [Path(x).name for x in find_folders(base_folder, f"{run}/{group}")], datasets))
    if len(metrics) == 0:
        return gr.update(choices=[], value=None)

    if type == "intersection":
        new_possibles_choices = set.intersection(*(set(s) for s in metrics))
    else:
        new_possibles_choices = set.union(*(set(s) for s in metrics))
    value = None
    if old_metrics:
        value = list(set.intersection(new_possibles_choices, {old_metrics}))
        value = value[0] if value else None

    if not value and len(new_possibles_choices) == 1:
        value = list(new_possibles_choices)[0]

    return gr.update(choices=sorted(list(new_possibles_choices)), value=value)

def reverse_search(base_folder: str, possible_datasets: List[str], grouping: str, metric_name: str) -> str:
    with ThreadPoolExecutor() as executor:
        found_datasets = list(executor.map(
            lambda dataset: dataset if metric_exists(base_folder, dataset, metric_name, grouping) else None,
            possible_datasets))
    found_datasets = [dataset for dataset in found_datasets if dataset is not None]
    return "\n".join(found_datasets)

def reverse_search_add(datasets: List[str], reverse_search_results: str) -> List[str]:
    datasets = datasets or []
    return sorted(list(set(datasets + reverse_search_results.strip().split("\n"))))

def metric_exists(base_folder: str, path: str, metric_name: str, group_by: str) -> bool:
    base_folder = get_datafolder(base_folder)
    return base_folder.exists(f"{path}/{group_by}/{metric_name}/metric.json")

@tenacity.retry(stop=tenacity.stop_after_attempt(5))
def load_metrics(base_folder: str, path: str, metric_name: str, group_by: str) -> MetricStatsDict:
    base_folder = get_datafolder(base_folder)
    with base_folder.open(f"{path}/{group_by}/{metric_name}/metric.json") as f:
        json_metric = json.load(f)
        return MetricStatsDict.from_dict(json_metric)

def load_data(dataset_path: str, base_folder: str, grouping: str, metric_name: str) -> MetricStatsDict:
    return load_metrics(base_folder, dataset_path, metric_name, grouping)