Spaces:
Running
Running
import argparse | |
import os | |
import shutil | |
from pathlib import Path | |
from tempfile import TemporaryDirectory | |
from typing import List, Optional, Tuple | |
import torch | |
from huggingface_hub import ( | |
CommitInfo, | |
CommitOperationAdd, | |
Discussion, | |
HfApi, | |
get_repo_discussions, | |
hf_hub_download, | |
) | |
from huggingface_hub.file_download import repo_folder_name | |
from optimum.exporters.onnx import validate_model_outputs | |
from optimum.exporters.tasks import TasksManager | |
from transformers import AutoConfig, AutoTokenizer, is_torch_available | |
from optimum.intel.openvino import ( | |
OVModelForAudioClassification, | |
OVModelForCausalLM, | |
OVModelForFeatureExtraction, | |
OVModelForImageClassification, | |
OVModelForMaskedLM, | |
OVModelForQuestionAnswering, | |
OVModelForSeq2SeqLM, | |
OVModelForSequenceClassification, | |
OVModelForTokenClassification, | |
OVStableDiffusionPipeline, | |
) | |
from optimum.intel.utils.constant import _TASK_ALIASES | |
from optimum.intel.openvino.utils import _HEAD_TO_AUTOMODELS | |
from optimum.exporters import TasksManager | |
SPACES_URL = "https://huggingface.co/spaces/echarlaix/openvino-export" | |
def previous_pr(api: "HfApi", model_id: str, pr_title: str) -> Optional["Discussion"]: | |
try: | |
discussions = api.get_repo_discussions(repo_id=model_id) | |
except Exception: | |
return None | |
for discussion in discussions: | |
if ( | |
discussion.status == "open" | |
and discussion.is_pull_request | |
and discussion.title == pr_title | |
): | |
return discussion | |
def convert_openvino(model_id: str, task: str, folder: str) -> List: | |
task = TasksManager.map_from_synonym(task) | |
if task == "auto": | |
try: | |
task = TasksManager.infer_task_from_model(model_id) | |
except KeyError as e: | |
raise KeyError( | |
f"The task could not be automatically inferred. Please provide the task argument with the relevant task from {', '.join(TasksManager.get_all_tasks())}. {e}" | |
) | |
task = _TASK_ALIASES.get(task, task) | |
if task not in _HEAD_TO_AUTOMODELS: | |
raise ValueError(f"The task '{task}' is not supported, only {_HEAD_TO_AUTOMODELS.keys()} tasks are supported") | |
if task == "text2text-generation": | |
raise ValueError("Export of Seq2Seq models is currently disabled.") | |
auto_model_class = eval(_HEAD_TO_AUTOMODELS[task]) | |
ov_model = auto_model_class.from_pretrained(model_id, export=True) | |
ov_model.save_pretrained(folder) | |
if not isinstance(ov_model, OVStableDiffusionPipeline): | |
try: | |
model = TasksManager.get_model_from_task(task, model_id) | |
exporter_config_class = TasksManager.get_exporter_config_constructor( | |
exporter="openvino", | |
model=model, | |
task=task, | |
model_name=model_id, | |
model_type=model.config.model_type.replace("_", "-"), | |
) | |
openvino_config = exporter_config_class(model.config) | |
inputs = openvino_config.generate_dummy_inputs(framework="pt") | |
ov_outputs = ov_model(**inputs) | |
outputs = model(**inputs) | |
for output_name in ov_outputs: | |
if isinstance(outputs, torch.Tensor) and not torch.allclose(outputs[output_name], ov_outputs[output_name], atol=1e-3): | |
raise ValueError( | |
"The exported model does not have the same outputs as the original model. Export interrupted." | |
) | |
except Exception as e: | |
raise | |
file_names = {elem for elem in os.listdir(folder) if os.path.isfile(os.path.join(folder, elem))} | |
operations = [ | |
CommitOperationAdd( | |
path_in_repo=file_name, path_or_fileobj=os.path.join(folder, file_name) | |
) | |
for file_name in file_names if "openvino" in file_name | |
] | |
dir_names = set(os.listdir(folder)) - file_names | |
for dir_name in dir_names.intersection({"vae_encoder", "vae_decoder", "text_encoder", "unet"}): | |
operations += [ | |
CommitOperationAdd( | |
path_in_repo=os.path.join(dir_name, file_name), | |
path_or_fileobj=os.path.join(folder, dir_name, file_name), | |
) | |
for file_name in os.listdir(os.path.join(folder, dir_name)) if "openvino" in file_name | |
] | |
return operations | |
def convert( | |
api: "HfApi", | |
model_id: str, | |
task: str, | |
force: bool = False, | |
) -> Tuple[int, "CommitInfo"]: | |
pr_title = "Adding OpenVINO file of this model" | |
info = api.model_info(model_id) | |
filenames = set(s.rfilename for s in info.siblings) | |
requesting_user = api.whoami()["name"] | |
if task == "auto": | |
try: | |
task = TasksManager.infer_task_from_model(model_id) | |
except Exception as e: | |
return ( | |
f"### Error: {e}. Please pass explicitely the task as it could not be infered.", | |
None, | |
) | |
with TemporaryDirectory() as d: | |
folder = os.path.join(d, repo_folder_name(repo_id=model_id, repo_type="models")) | |
os.makedirs(folder) | |
new_pr = None | |
try: | |
pr = previous_pr(api, model_id, pr_title) | |
if "openvino_model.xml" in filenames and not force: | |
raise Exception(f"Model {model_id} is already converted, skipping..") | |
elif pr is not None and not force: | |
url = f"https://huggingface.co/{model_id}/discussions/{pr.num}" | |
new_pr = pr | |
raise Exception( | |
f"Model {model_id} already has an open PR check out [{url}]({url})" | |
) | |
else: | |
operations = convert_openvino(model_id, task, folder) | |
commit_description = f""" | |
Beep boop I am the [OpenVINO exporter bot π€]({SPACES_URL}). On behalf of [{requesting_user}](https://huggingface.co/{requesting_user}), I would like to add to this repository the exported OpenVINO model. | |
""" | |
new_pr = api.create_commit( | |
repo_id=model_id, | |
operations=operations, | |
commit_message=pr_title, | |
commit_description=commit_description, | |
create_pr=True, | |
) | |
finally: | |
shutil.rmtree(folder) | |
return "0", new_pr | |