ShaderEval /
Vipitis's picture
changing the default model (linked on space too)
history blame contribute delete
No virus
14.1 kB
# Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor.
# #TODO: license: MIT pending (evaluation suite itself can be completely open, nothing copyleft from the dataset reaches us here)
"""TODO: Add a description here."""
# TODO: Add BibTeX citation
_CITATION = """\
title = {A great new module},
authors={huggingface, Inc.},
# TODO: Add description of the module here
This EvaluationSuite currently solves {1} tasks to test code intelligence of genereative language models for "creative programming" (fragment shaders).
# via
import evaluate
from evaluate import evaluator #used by
from evaluate.evaluator.utils import DatasetColumn # used in .prepare_data()
from evaluate.evaluation_suite import SubTask
from datasets import Dataset
from typing import Any, Callable, Dict, List, Optional, Union # used in .prepare_pipeline()
import transformers
from transformers import Pipeline, pipeline, GenerationConfig, AutoTokenizer #GenerationConfig to specify greedy and avoid error
from datasets import load_dataset #used by
# write a custom evaluator, inherent from:
class ReturnGenerationEvaluator(evaluate.TextGenerationEvaluator):
def __init__(self, task="text-generation", default_metric_name="exact_match", predictions_prefix: str = "generated"):
super().__init__(task=task, default_metric_name=default_metric_name)
self.predictions_prefix = predictions_prefix
greedy_cfg = GenerationConfig(
do_sample = False, # default to ensure greedy
num_beams = 1, # same as above
PIPELINE_KWARGS = {"return_full_text": False, "generation_config":greedy_cfg} #these kwargs are for the pipeline call, not the pipeline init - but that seems to still work.
# for the pipeline init we need to copy the whole function and add two lines. this still prints errors due to the pad_toke_id = eos_token_id change.
# from:
def prepare_pipeline(
model_or_pipeline: Union[str, "Pipeline", Callable, "PreTrainedModel", "TFPreTrainedModel"], # noqa: F821
tokenizer: Union["PreTrainedTokenizerBase", "FeatureExtractionMixin"] = None, # noqa: F821
feature_extractor: Union["PreTrainedTokenizerBase", "FeatureExtractionMixin"] = None, # noqa: F821
device: int = None,
Prepare pipeline.
model_or_pipeline (`str` or `Pipeline` or `Callable` or `PreTrainedModel` or `TFPreTrainedModel`,
defaults to `None`):
If the argument in not specified, we initialize the default pipeline for the task. If the argument is of the type `str` or
is a model instance, we use it to initialize a new `Pipeline` with the given model. Otherwise we assume the
argument specifies a pre-initialized pipeline.
preprocessor (`PreTrainedTokenizerBase` or `FeatureExtractionMixin`, *optional*, defaults to `None`):
Argument can be used to overwrite a default preprocessor if `model_or_pipeline` represents a model for
which we build a pipeline. If `model_or_pipeline` is `None` or a pre-initialized pipeline, we ignore
this argument.
The initialized pipeline, with modifications for the specific task of generating text, even with long inputs.
if device is None:
device = self._infer_device()
if (
isinstance(model_or_pipeline, str)
or isinstance(model_or_pipeline, transformers.PreTrainedModel)
or isinstance(model_or_pipeline, transformers.TFPreTrainedModel)
if isinstance(model_or_pipeline, str):
# load tokenizer manually, since the pipeline does fail to do so at times. needed for bigcode/santacoder for example.
tokenizer = AutoTokenizer.from_pretrained(model_or_pipeline, trust_remote_code=True)
pipe = pipeline(
# my additions here:
handle_long_generation= "hole", #our solution? relevant:
# pad_token_id=tokenizer.eos_token_id, #to avoid the warning, however there might be issues as tokenizers will call this differently.
do_sample=False, #important to get reproduceable results but we need to make sure the generator is deterministic
trust_remote_code=True, # do we need this for some custom models? need to test if it works right here. one example is bigcode/santacoder
if model_or_pipeline is None:
pipe = pipeline(self.task, device=device)
pipe = model_or_pipeline
# if tokenizer is not None and feature_extractor is not None:
# logger.warning("Ignoring the value of the preprocessor argument (`tokenizer` or `feature_extractor`).") #excluded warning because I didn't import logger
if (pipe.task != self.task) and not (self.task == "translation" and pipe.task.startswith("translation")):
raise ValueError(
f"Incompatible `model_or_pipeline`. Please specify `model_or_pipeline` compatible with the `{self.task}` task."
# fixinging default for max_lenght
pipe.model.config.max_length = self._resolve_context_lenght(pipe=pipe)
# update the generation config with information from the pipe
return pipe
def _update_generation_config(self, pipe):
Update the generation config with information from the pipe. Sets eos_token_id and pad_token_id.
pipe (:class:`~transformers.Pipeline`): we need to access the tokenizer.vocab
semicolon_token_ids = [v for k,v in pipe.tokenizer.vocab.items() if ";" in k] # this requires the tokenizer, which we only have once a pipe is made.
# GenerationConfig.update also exists, but it does only replace, not add kwargs.
self.greedy_cfg.eos_token_id = semicolon_token_ids # eos_token_id can be a list, so we give them all possible tokens.
self.greedy_cfg.pad_token_id = semicolon_token_ids[0] # pad_token_id has to be an int, so we just take the first one.
return None # doesn't do anything?
def _resolve_context_lenght(self, model_or_pipeline=None, pipe=None): #TODO should really copy the typing hints here.
if isinstance(model_or_pipeline, transformers.GPT2Model): # you are comparing a string here -.-
return model_or_pipeline.config.n_ctx # how GPT2 models might handle is, seen with
if pipe is not None: #should I figure out a way to pass this.
return pipe.tokenizer.model_max_length # this is set to something small for pipeline default task, but we would want to put it to the max instead.
# tokenizer needs to know the context length for our pipe strategy, but it has to be passed to the tokenizer, not model.
# the tokenizer should read from the model config, but that can be wrong, or it has a task overwrite (for "text-generation" for example you get 50)
#model_or_pipeline only exists via the .compute call, so we have to take it in
# model_or_pipeline.tokenier.config.max_new_tokens = 1024 # we shouldn't return it, but overwrite the tokenizer config, which the pipeline relies on.
return 1024 # we shouldn't return it, but overwrite the tokenizer config, which the pipeline relies on.
def _estimate_stopping(self, labels, **kwargs):
""" estimates max_new_tokens for the pipeline call
by counting the characters in the longest string of the references adding 5 (for good measure but probably not needed)
labels: A list of dicts by knowing the labels
`int`: the estimated max_new_tokens, should be smaller than context_lenght in all cases
context_lenght = self._resolve_context_lenght(**kwargs)
estimate = min(max([len(ref) for ref in labels]) + 5, context_lenght) #does the min call get done inside the pipeline anyway? is there even a single case where the return statement is this long?
return estimate
# this one needs to be adjusted
def predictions_processor(self, predictions, *args, **kwargs):
processes the output of the pipeline to be compatible with the metric.
generated texts cut off by the first semicolon and whitespaces are stripped (using python str builtins)
predictions: A list of lists of dicts
`dict`: All the processed text are flattened and stored under the "predictions" key.
return {"predictions": [pred[f"{self.predictions_prefix}_text"].split(";")[0].strip() for pred_list in predictions for pred in pred_list]}
# straight copy, doesn't seem to give me the
def prepare_data(self, data: Dataset, input_column: str, label_column: str, *args, **kwargs):
Prepare data.
data (`Dataset`): Specifies the dataset we will run evaluation on.
input_column (`str`, defaults to `"text"`):
the name of the column containing the text feature in the dataset specified by `data`.
label_column (`str`, defaults to `"label"`):
the name of the column containing the labels in the dataset specified by `data`.
`dict`: metric inputs. everything before the first semicolon and whitespaces are stripped (using python str builtins, just like the pred prep)
`list`: pipeline inputs.
self.check_required_columns(data, {"input_column": input_column, "label_column": label_column}) #this will throw and exception with useful error messages
# don't put everything in the return statement, so you have the control...
references = [ref.split(";")[0].strip() for ref in data[label_column]]
self.PIPELINE_KWARGS.update({"max_new_tokens": self._estimate_stopping(references)}) #this is a hack, does it work tho?
return {"references": references}, data[input_column] #DatasetColumn(data, input_column) doesn't seem to work. data[input_column] does, but ignores any of the features of the helper class..
# via:
# relevant source:
class Suite(evaluate.EvaluationSuite):
def __init__(self, name):
self.preprocessor = lambda x: {"return_statement": x["return_statement"].split(";")[0]} #like this? refactored to RetrunGenerationEvaluator
self.suite = [
# more subtasks are only possible once we can pass custom evaluators. ->
SubTask( #this one is adjusted already
task_type="text-generation", #this call an evaluator, but can you specify your own custom evaluator instead?
split="test", # use this to select a subset of the data during testing, perhaps remove later?
# "metric": "exact_match",
"input_column": "body",
"label_column": "return_statement",
# from:
def run(
self, model_or_pipeline: Union[str, "Pipeline", Callable, "PreTrainedModel", "TFPreTrainedModel"] = "Vipitis/santacoder-finetuned-Shadertoys-fine", #not so useful default model?
snippet: int = "" # noqa: F821
) -> Dict[str, float]:
results_all = []
for task in self.suite:
task_name =
if task.data_preprocessor: # task requires extra preprocessing is all done inside the Evaluator
ds = load_dataset(, name=task.subset, split=(task.split + f"[:{snippet}]")) =
task_evaluator = ReturnGenerationEvaluator() #this is the change we make: specify our custom evaluator from above.
args_for_task = task.args_for_task
args_for_task["model_or_pipeline"] = model_or_pipeline
args_for_task["data"] =
args_for_task["subset"] = task.subset
args_for_task["split"] = (task.split + f"[:{snippet}]") #make a downselection of the split via keywordarg in the .run() call?
results = task_evaluator.compute(**args_for_task)
results["model_cp"] = model_or_pipeline #added this to the output, should be useful. But be careful when passed something that is not a string. #TODO: currently the same for all tasks, maybe move to the list?
results["task_name"] = task_name + "/" + task.subset if task.subset else task_name
results["data_preprocessor"] = str(task.data_preprocessor) if task.data_preprocessor is not None else None
return results_all