Aston-xMAD's picture
init commit
b37c16f verified
#!/usr/bin/env python
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import importlib
import inspect
import io
import json
import os
import tempfile
from typing import Any, Dict, List, Optional, Union
from huggingface_hub import create_repo, hf_hub_download, metadata_update, upload_folder
from huggingface_hub.utils import RepositoryNotFoundError, build_hf_headers, get_session
from ..dynamic_module_utils import custom_object_save, get_class_from_dynamic_module, get_imports
from ..image_utils import is_pil_image
from ..models.auto import AutoProcessor
from ..utils import (
CONFIG_NAME,
cached_file,
is_accelerate_available,
is_torch_available,
is_vision_available,
logging,
)
from .agent_types import handle_agent_inputs, handle_agent_outputs
logger = logging.get_logger(__name__)
if is_torch_available():
import torch
if is_accelerate_available():
from accelerate import PartialState
from accelerate.utils import send_to_device
TOOL_CONFIG_FILE = "tool_config.json"
def get_repo_type(repo_id, repo_type=None, **hub_kwargs):
if repo_type is not None:
return repo_type
try:
hf_hub_download(repo_id, TOOL_CONFIG_FILE, repo_type="space", **hub_kwargs)
return "space"
except RepositoryNotFoundError:
try:
hf_hub_download(repo_id, TOOL_CONFIG_FILE, repo_type="model", **hub_kwargs)
return "model"
except RepositoryNotFoundError:
raise EnvironmentError(f"`{repo_id}` does not seem to be a valid repo identifier on the Hub.")
except Exception:
return "model"
except Exception:
return "space"
# docstyle-ignore
APP_FILE_TEMPLATE = """from transformers import launch_gradio_demo
from {module_name} import {class_name}
launch_gradio_demo({class_name})
"""
class Tool:
"""
A base class for the functions used by the agent. Subclass this and implement the `__call__` method as well as the
following class attributes:
- **description** (`str`) -- A short description of what your tool does, the inputs it expects and the output(s) it
will return. For instance 'This is a tool that downloads a file from a `url`. It takes the `url` as input, and
returns the text contained in the file'.
- **name** (`str`) -- A performative name that will be used for your tool in the prompt to the agent. For instance
`"text-classifier"` or `"image_generator"`.
- **inputs** (`List[str]`) -- The list of modalities expected for the inputs (in the same order as in the call).
Modalitiies should be `"text"`, `"image"` or `"audio"`. This is only used by `launch_gradio_demo` or to make a
nice space from your tool.
- **outputs** (`List[str]`) -- The list of modalities returned but the tool (in the same order as the return of the
call method). Modalitiies should be `"text"`, `"image"` or `"audio"`. This is only used by `launch_gradio_demo`
or to make a nice space from your tool.
You can also override the method [`~Tool.setup`] if your tool as an expensive operation to perform before being
usable (such as loading a model). [`~Tool.setup`] will be called the first time you use your tool, but not at
instantiation.
"""
description: str = "This is a tool that ..."
name: str = ""
inputs: List[str]
outputs: List[str]
def __init__(self, *args, **kwargs):
self.is_initialized = False
def __call__(self, *args, **kwargs):
return NotImplemented("Write this method in your subclass of `Tool`.")
def setup(self):
"""
Overwrite this method here for any operation that is expensive and needs to be executed before you start using
your tool. Such as loading a big model.
"""
self.is_initialized = True
def save(self, output_dir):
"""
Saves the relevant code files for your tool so it can be pushed to the Hub. This will copy the code of your
tool in `output_dir` as well as autogenerate:
- a config file named `tool_config.json`
- an `app.py` file so that your tool can be converted to a space
- a `requirements.txt` containing the names of the module used by your tool (as detected when inspecting its
code)
You should only use this method to save tools that are defined in a separate module (not `__main__`).
Args:
output_dir (`str`): The folder in which you want to save your tool.
"""
os.makedirs(output_dir, exist_ok=True)
# Save module file
if self.__module__ == "__main__":
raise ValueError(
f"We can't save the code defining {self} in {output_dir} as it's been defined in __main__. You "
"have to put this code in a separate module so we can include it in the saved folder."
)
module_files = custom_object_save(self, output_dir)
module_name = self.__class__.__module__
last_module = module_name.split(".")[-1]
full_name = f"{last_module}.{self.__class__.__name__}"
# Save config file
config_file = os.path.join(output_dir, "tool_config.json")
if os.path.isfile(config_file):
with open(config_file, "r", encoding="utf-8") as f:
tool_config = json.load(f)
else:
tool_config = {}
tool_config = {"tool_class": full_name, "description": self.description, "name": self.name}
with open(config_file, "w", encoding="utf-8") as f:
f.write(json.dumps(tool_config, indent=2, sort_keys=True) + "\n")
# Save app file
app_file = os.path.join(output_dir, "app.py")
with open(app_file, "w", encoding="utf-8") as f:
f.write(APP_FILE_TEMPLATE.format(module_name=last_module, class_name=self.__class__.__name__))
# Save requirements file
requirements_file = os.path.join(output_dir, "requirements.txt")
imports = []
for module in module_files:
imports.extend(get_imports(module))
imports = list(set(imports))
with open(requirements_file, "w", encoding="utf-8") as f:
f.write("\n".join(imports) + "\n")
@classmethod
def from_hub(
cls,
repo_id: str,
model_repo_id: Optional[str] = None,
token: Optional[str] = None,
remote: bool = False,
**kwargs,
):
"""
Loads a tool defined on the Hub.
<Tip warning={true}>
Loading a tool from the Hub means that you'll download the tool and execute it locally.
ALWAYS inspect the tool you're downloading before loading it within your runtime, as you would do when
installing a package using pip/npm/apt.
</Tip>
Args:
repo_id (`str`):
The name of the repo on the Hub where your tool is defined.
model_repo_id (`str`, *optional*):
If your tool uses a model and you want to use a different model than the default, you can pass a second
repo ID or an endpoint url to this argument.
token (`str`, *optional*):
The token to identify you on hf.co. If unset, will use the token generated when running
`huggingface-cli login` (stored in `~/.huggingface`).
remote (`bool`, *optional*, defaults to `False`):
Whether to use your tool by downloading the model or (if it is available) with an inference endpoint.
kwargs (additional keyword arguments, *optional*):
Additional keyword arguments that will be split in two: all arguments relevant to the Hub (such as
`cache_dir`, `revision`, `subfolder`) will be used when downloading the files for your tool, and the
others will be passed along to its init.
"""
if remote and model_repo_id is None:
endpoints = get_default_endpoints()
if repo_id not in endpoints:
raise ValueError(
f"Could not infer a default endpoint for {repo_id}, you need to pass one using the "
"`model_repo_id` argument."
)
model_repo_id = endpoints[repo_id]
hub_kwargs_names = [
"cache_dir",
"force_download",
"resume_download",
"proxies",
"revision",
"repo_type",
"subfolder",
"local_files_only",
]
hub_kwargs = {k: v for k, v in kwargs.items() if k in hub_kwargs_names}
# Try to get the tool config first.
hub_kwargs["repo_type"] = get_repo_type(repo_id, **hub_kwargs)
resolved_config_file = cached_file(
repo_id,
TOOL_CONFIG_FILE,
token=token,
**hub_kwargs,
_raise_exceptions_for_gated_repo=False,
_raise_exceptions_for_missing_entries=False,
_raise_exceptions_for_connection_errors=False,
)
is_tool_config = resolved_config_file is not None
if resolved_config_file is None:
resolved_config_file = cached_file(
repo_id,
CONFIG_NAME,
token=token,
**hub_kwargs,
_raise_exceptions_for_gated_repo=False,
_raise_exceptions_for_missing_entries=False,
_raise_exceptions_for_connection_errors=False,
)
if resolved_config_file is None:
raise EnvironmentError(
f"{repo_id} does not appear to provide a valid configuration in `tool_config.json` or `config.json`."
)
with open(resolved_config_file, encoding="utf-8") as reader:
config = json.load(reader)
if not is_tool_config:
if "custom_tool" not in config:
raise EnvironmentError(
f"{repo_id} does not provide a mapping to custom tools in its configuration `config.json`."
)
custom_tool = config["custom_tool"]
else:
custom_tool = config
tool_class = custom_tool["tool_class"]
tool_class = get_class_from_dynamic_module(tool_class, repo_id, token=token, **hub_kwargs)
if len(tool_class.name) == 0:
tool_class.name = custom_tool["name"]
if tool_class.name != custom_tool["name"]:
logger.warning(
f"{tool_class.__name__} implements a different name in its configuration and class. Using the tool "
"configuration name."
)
tool_class.name = custom_tool["name"]
if len(tool_class.description) == 0:
tool_class.description = custom_tool["description"]
if tool_class.description != custom_tool["description"]:
logger.warning(
f"{tool_class.__name__} implements a different description in its configuration and class. Using the "
"tool configuration description."
)
tool_class.description = custom_tool["description"]
if remote:
return RemoteTool(model_repo_id, token=token, tool_class=tool_class)
return tool_class(model_repo_id, token=token, **kwargs)
def push_to_hub(
self,
repo_id: str,
commit_message: str = "Upload tool",
private: Optional[bool] = None,
token: Optional[Union[bool, str]] = None,
create_pr: bool = False,
) -> str:
"""
Upload the tool to the Hub.
Parameters:
repo_id (`str`):
The name of the repository you want to push your tool to. It should contain your organization name when
pushing to a given organization.
commit_message (`str`, *optional*, defaults to `"Upload tool"`):
Message to commit while pushing.
private (`bool`, *optional*):
Whether or not the repository created should be private.
token (`bool` or `str`, *optional*):
The token to use as HTTP bearer authorization for remote files. If unset, will use the token generated
when running `huggingface-cli login` (stored in `~/.huggingface`).
create_pr (`bool`, *optional*, defaults to `False`):
Whether or not to create a PR with the uploaded files or directly commit.
"""
repo_url = create_repo(
repo_id=repo_id, token=token, private=private, exist_ok=True, repo_type="space", space_sdk="gradio"
)
repo_id = repo_url.repo_id
metadata_update(repo_id, {"tags": ["tool"]}, repo_type="space")
with tempfile.TemporaryDirectory() as work_dir:
# Save all files.
self.save(work_dir)
logger.info(f"Uploading the following files to {repo_id}: {','.join(os.listdir(work_dir))}")
return upload_folder(
repo_id=repo_id,
commit_message=commit_message,
folder_path=work_dir,
token=token,
create_pr=create_pr,
repo_type="space",
)
@staticmethod
def from_gradio(gradio_tool):
"""
Creates a [`Tool`] from a gradio tool.
"""
class GradioToolWrapper(Tool):
def __init__(self, _gradio_tool):
super().__init__()
self.name = _gradio_tool.name
self.description = _gradio_tool.description
GradioToolWrapper.__call__ = gradio_tool.run
return GradioToolWrapper(gradio_tool)
class RemoteTool(Tool):
"""
A [`Tool`] that will make requests to an inference endpoint.
Args:
endpoint_url (`str`, *optional*):
The url of the endpoint to use.
token (`str`, *optional*):
The token to use as HTTP bearer authorization for remote files. If unset, will use the token generated when
running `huggingface-cli login` (stored in `~/.huggingface`).
tool_class (`type`, *optional*):
The corresponding `tool_class` if this is a remote version of an existing tool. Will help determine when
the output should be converted to another type (like images).
"""
def __init__(self, endpoint_url=None, token=None, tool_class=None):
self.endpoint_url = endpoint_url
self.client = EndpointClient(endpoint_url, token=token)
self.tool_class = tool_class
def prepare_inputs(self, *args, **kwargs):
"""
Prepare the inputs received for the HTTP client sending data to the endpoint. Positional arguments will be
matched with the signature of the `tool_class` if it was provided at instantation. Images will be encoded into
bytes.
You can override this method in your custom class of [`RemoteTool`].
"""
inputs = kwargs.copy()
if len(args) > 0:
if self.tool_class is not None:
# Match args with the signature
if issubclass(self.tool_class, PipelineTool):
call_method = self.tool_class.encode
else:
call_method = self.tool_class.__call__
signature = inspect.signature(call_method).parameters
parameters = [
k
for k, p in signature.items()
if p.kind not in [inspect._ParameterKind.VAR_POSITIONAL, inspect._ParameterKind.VAR_KEYWORD]
]
if parameters[0] == "self":
parameters = parameters[1:]
if len(args) > len(parameters):
raise ValueError(
f"{self.tool_class} only accepts {len(parameters)} arguments but {len(args)} were given."
)
for arg, name in zip(args, parameters):
inputs[name] = arg
elif len(args) > 1:
raise ValueError("A `RemoteTool` can only accept one positional input.")
elif len(args) == 1:
if is_pil_image(args[0]):
return {"inputs": self.client.encode_image(args[0])}
return {"inputs": args[0]}
for key, value in inputs.items():
if is_pil_image(value):
inputs[key] = self.client.encode_image(value)
return {"inputs": inputs}
def extract_outputs(self, outputs):
"""
You can override this method in your custom class of [`RemoteTool`] to apply some custom post-processing of the
outputs of the endpoint.
"""
return outputs
def __call__(self, *args, **kwargs):
args, kwargs = handle_agent_inputs(*args, **kwargs)
output_image = self.tool_class is not None and self.tool_class.outputs == ["image"]
inputs = self.prepare_inputs(*args, **kwargs)
if isinstance(inputs, dict):
outputs = self.client(**inputs, output_image=output_image)
else:
outputs = self.client(inputs, output_image=output_image)
if isinstance(outputs, list) and len(outputs) == 1 and isinstance(outputs[0], list):
outputs = outputs[0]
outputs = handle_agent_outputs(outputs, self.tool_class.outputs if self.tool_class is not None else None)
return self.extract_outputs(outputs)
class PipelineTool(Tool):
"""
A [`Tool`] tailored towards Transformer models. On top of the class attributes of the base class [`Tool`], you will
need to specify:
- **model_class** (`type`) -- The class to use to load the model in this tool.
- **default_checkpoint** (`str`) -- The default checkpoint that should be used when the user doesn't specify one.
- **pre_processor_class** (`type`, *optional*, defaults to [`AutoProcessor`]) -- The class to use to load the
pre-processor
- **post_processor_class** (`type`, *optional*, defaults to [`AutoProcessor`]) -- The class to use to load the
post-processor (when different from the pre-processor).
Args:
model (`str` or [`PreTrainedModel`], *optional*):
The name of the checkpoint to use for the model, or the instantiated model. If unset, will default to the
value of the class attribute `default_checkpoint`.
pre_processor (`str` or `Any`, *optional*):
The name of the checkpoint to use for the pre-processor, or the instantiated pre-processor (can be a
tokenizer, an image processor, a feature extractor or a processor). Will default to the value of `model` if
unset.
post_processor (`str` or `Any`, *optional*):
The name of the checkpoint to use for the post-processor, or the instantiated pre-processor (can be a
tokenizer, an image processor, a feature extractor or a processor). Will default to the `pre_processor` if
unset.
device (`int`, `str` or `torch.device`, *optional*):
The device on which to execute the model. Will default to any accelerator available (GPU, MPS etc...), the
CPU otherwise.
device_map (`str` or `dict`, *optional*):
If passed along, will be used to instantiate the model.
model_kwargs (`dict`, *optional*):
Any keyword argument to send to the model instantiation.
token (`str`, *optional*):
The token to use as HTTP bearer authorization for remote files. If unset, will use the token generated when
running `huggingface-cli login` (stored in `~/.huggingface`).
hub_kwargs (additional keyword arguments, *optional*):
Any additional keyword argument to send to the methods that will load the data from the Hub.
"""
pre_processor_class = AutoProcessor
model_class = None
post_processor_class = AutoProcessor
default_checkpoint = None
def __init__(
self,
model=None,
pre_processor=None,
post_processor=None,
device=None,
device_map=None,
model_kwargs=None,
token=None,
**hub_kwargs,
):
if not is_torch_available():
raise ImportError("Please install torch in order to use this tool.")
if not is_accelerate_available():
raise ImportError("Please install accelerate in order to use this tool.")
if model is None:
if self.default_checkpoint is None:
raise ValueError("This tool does not implement a default checkpoint, you need to pass one.")
model = self.default_checkpoint
if pre_processor is None:
pre_processor = model
self.model = model
self.pre_processor = pre_processor
self.post_processor = post_processor
self.device = device
self.device_map = device_map
self.model_kwargs = {} if model_kwargs is None else model_kwargs
if device_map is not None:
self.model_kwargs["device_map"] = device_map
self.hub_kwargs = hub_kwargs
self.hub_kwargs["token"] = token
super().__init__()
def setup(self):
"""
Instantiates the `pre_processor`, `model` and `post_processor` if necessary.
"""
if isinstance(self.pre_processor, str):
self.pre_processor = self.pre_processor_class.from_pretrained(self.pre_processor, **self.hub_kwargs)
if isinstance(self.model, str):
self.model = self.model_class.from_pretrained(self.model, **self.model_kwargs, **self.hub_kwargs)
if self.post_processor is None:
self.post_processor = self.pre_processor
elif isinstance(self.post_processor, str):
self.post_processor = self.post_processor_class.from_pretrained(self.post_processor, **self.hub_kwargs)
if self.device is None:
if self.device_map is not None:
self.device = list(self.model.hf_device_map.values())[0]
else:
self.device = PartialState().default_device
if self.device_map is None:
self.model.to(self.device)
super().setup()
def encode(self, raw_inputs):
"""
Uses the `pre_processor` to prepare the inputs for the `model`.
"""
return self.pre_processor(raw_inputs)
def forward(self, inputs):
"""
Sends the inputs through the `model`.
"""
with torch.no_grad():
return self.model(**inputs)
def decode(self, outputs):
"""
Uses the `post_processor` to decode the model output.
"""
return self.post_processor(outputs)
def __call__(self, *args, **kwargs):
args, kwargs = handle_agent_inputs(*args, **kwargs)
if not self.is_initialized:
self.setup()
encoded_inputs = self.encode(*args, **kwargs)
encoded_inputs = send_to_device(encoded_inputs, self.device)
outputs = self.forward(encoded_inputs)
outputs = send_to_device(outputs, "cpu")
decoded_outputs = self.decode(outputs)
return handle_agent_outputs(decoded_outputs, self.outputs)
def launch_gradio_demo(tool_class: Tool):
"""
Launches a gradio demo for a tool. The corresponding tool class needs to properly implement the class attributes
`inputs` and `outputs`.
Args:
tool_class (`type`): The class of the tool for which to launch the demo.
"""
try:
import gradio as gr
except ImportError:
raise ImportError("Gradio should be installed in order to launch a gradio demo.")
tool = tool_class()
def fn(*args, **kwargs):
return tool(*args, **kwargs)
gr.Interface(
fn=fn,
inputs=tool_class.inputs,
outputs=tool_class.outputs,
title=tool_class.__name__,
article=tool.description,
).launch()
TASK_MAPPING = {
"document-question-answering": "DocumentQuestionAnsweringTool",
"image-captioning": "ImageCaptioningTool",
"image-question-answering": "ImageQuestionAnsweringTool",
"image-segmentation": "ImageSegmentationTool",
"speech-to-text": "SpeechToTextTool",
"summarization": "TextSummarizationTool",
"text-classification": "TextClassificationTool",
"text-question-answering": "TextQuestionAnsweringTool",
"text-to-speech": "TextToSpeechTool",
"translation": "TranslationTool",
}
def get_default_endpoints():
endpoints_file = cached_file("huggingface-tools/default-endpoints", "default_endpoints.json", repo_type="dataset")
with open(endpoints_file, "r", encoding="utf-8") as f:
endpoints = json.load(f)
return endpoints
def supports_remote(task_or_repo_id):
endpoints = get_default_endpoints()
return task_or_repo_id in endpoints
def load_tool(task_or_repo_id, model_repo_id=None, remote=False, token=None, **kwargs):
"""
Main function to quickly load a tool, be it on the Hub or in the Transformers library.
<Tip warning={true}>
Loading a tool means that you'll download the tool and execute it locally.
ALWAYS inspect the tool you're downloading before loading it within your runtime, as you would do when
installing a package using pip/npm/apt.
</Tip>
Args:
task_or_repo_id (`str`):
The task for which to load the tool or a repo ID of a tool on the Hub. Tasks implemented in Transformers
are:
- `"document-question-answering"`
- `"image-captioning"`
- `"image-question-answering"`
- `"image-segmentation"`
- `"speech-to-text"`
- `"summarization"`
- `"text-classification"`
- `"text-question-answering"`
- `"text-to-speech"`
- `"translation"`
model_repo_id (`str`, *optional*):
Use this argument to use a different model than the default one for the tool you selected.
remote (`bool`, *optional*, defaults to `False`):
Whether to use your tool by downloading the model or (if it is available) with an inference endpoint.
token (`str`, *optional*):
The token to identify you on hf.co. If unset, will use the token generated when running `huggingface-cli
login` (stored in `~/.huggingface`).
kwargs (additional keyword arguments, *optional*):
Additional keyword arguments that will be split in two: all arguments relevant to the Hub (such as
`cache_dir`, `revision`, `subfolder`) will be used when downloading the files for your tool, and the others
will be passed along to its init.
"""
if task_or_repo_id in TASK_MAPPING:
tool_class_name = TASK_MAPPING[task_or_repo_id]
main_module = importlib.import_module("transformers")
tools_module = main_module.tools
tool_class = getattr(tools_module, tool_class_name)
if remote:
if model_repo_id is None:
endpoints = get_default_endpoints()
if task_or_repo_id not in endpoints:
raise ValueError(
f"Could not infer a default endpoint for {task_or_repo_id}, you need to pass one using the "
"`model_repo_id` argument."
)
model_repo_id = endpoints[task_or_repo_id]
return RemoteTool(model_repo_id, token=token, tool_class=tool_class)
else:
return tool_class(model_repo_id, token=token, **kwargs)
else:
logger.warning_once(
f"You're loading a tool from the Hub from {model_repo_id}. Please make sure this is a source that you "
f"trust as the code within that tool will be executed on your machine. Always verify the code of "
f"the tools that you load. We recommend specifying a `revision` to ensure you're loading the "
f"code that you have checked."
)
return Tool.from_hub(task_or_repo_id, model_repo_id=model_repo_id, token=token, remote=remote, **kwargs)
def add_description(description):
"""
A decorator that adds a description to a function.
"""
def inner(func):
func.description = description
func.name = func.__name__
return func
return inner
## Will move to the Hub
class EndpointClient:
def __init__(self, endpoint_url: str, token: Optional[str] = None):
self.headers = {**build_hf_headers(token=token), "Content-Type": "application/json"}
self.endpoint_url = endpoint_url
@staticmethod
def encode_image(image):
_bytes = io.BytesIO()
image.save(_bytes, format="PNG")
b64 = base64.b64encode(_bytes.getvalue())
return b64.decode("utf-8")
@staticmethod
def decode_image(raw_image):
if not is_vision_available():
raise ImportError(
"This tool returned an image but Pillow is not installed. Please install it (`pip install Pillow`)."
)
from PIL import Image
b64 = base64.b64decode(raw_image)
_bytes = io.BytesIO(b64)
return Image.open(_bytes)
def __call__(
self,
inputs: Optional[Union[str, Dict, List[str], List[List[str]]]] = None,
params: Optional[Dict] = None,
data: Optional[bytes] = None,
output_image: bool = False,
) -> Any:
# Build payload
payload = {}
if inputs:
payload["inputs"] = inputs
if params:
payload["parameters"] = params
# Make API call
response = get_session().post(self.endpoint_url, headers=self.headers, json=payload, data=data)
# By default, parse the response for the user.
if output_image:
return self.decode_image(response.content)
else:
return response.json()