Spaces:
Runtime error
Runtime error
# Copyright 2024 the LlamaFactory team. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import json | |
import os | |
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Sequence, Tuple | |
from ..chat import ChatModel | |
from ..data import Role | |
from ..extras.constants import PEFT_METHODS | |
from ..extras.misc import torch_gc | |
from ..extras.packages import is_gradio_available | |
from .common import QUANTIZATION_BITS, get_save_dir | |
from .locales import ALERTS | |
if TYPE_CHECKING: | |
from ..chat import BaseEngine | |
from .manager import Manager | |
if is_gradio_available(): | |
import gradio as gr | |
class WebChatModel(ChatModel): | |
def __init__(self, manager: "Manager", demo_mode: bool = False, lazy_init: bool = True) -> None: | |
self.manager = manager | |
self.demo_mode = demo_mode | |
self.engine: Optional["BaseEngine"] = None | |
if not lazy_init: # read arguments from command line | |
super().__init__() | |
if demo_mode and os.environ.get("DEMO_MODEL") and os.environ.get("DEMO_TEMPLATE"): # load demo model | |
model_name_or_path = os.environ.get("DEMO_MODEL") | |
template = os.environ.get("DEMO_TEMPLATE") | |
infer_backend = os.environ.get("DEMO_BACKEND", "huggingface") | |
super().__init__( | |
dict(model_name_or_path=model_name_or_path, template=template, infer_backend=infer_backend) | |
) | |
def loaded(self) -> bool: | |
return self.engine is not None | |
def load_model(self, data) -> Generator[str, None, None]: | |
get = lambda elem_id: data[self.manager.get_elem_by_id(elem_id)] | |
lang, model_name, model_path = get("top.lang"), get("top.model_name"), get("top.model_path") | |
finetuning_type, checkpoint_path = get("top.finetuning_type"), get("top.checkpoint_path") | |
error = "" | |
if self.loaded: | |
error = ALERTS["err_exists"][lang] | |
elif not model_name: | |
error = ALERTS["err_no_model"][lang] | |
elif not model_path: | |
error = ALERTS["err_no_path"][lang] | |
elif self.demo_mode: | |
error = ALERTS["err_demo"][lang] | |
if error: | |
gr.Warning(error) | |
yield error | |
return | |
if get("top.quantization_bit") in QUANTIZATION_BITS: | |
quantization_bit = int(get("top.quantization_bit")) | |
else: | |
quantization_bit = None | |
yield ALERTS["info_loading"][lang] | |
args = dict( | |
model_name_or_path=model_path, | |
finetuning_type=finetuning_type, | |
quantization_bit=quantization_bit, | |
quantization_method=get("top.quantization_method"), | |
template=get("top.template"), | |
flash_attn="fa2" if get("top.booster") == "flashattn2" else "auto", | |
use_unsloth=(get("top.booster") == "unsloth"), | |
rope_scaling=get("top.rope_scaling") if get("top.rope_scaling") in ["linear", "dynamic"] else None, | |
infer_backend=get("infer.infer_backend"), | |
infer_dtype=get("infer.infer_dtype"), | |
) | |
if checkpoint_path: | |
if finetuning_type in PEFT_METHODS: # list | |
args["adapter_name_or_path"] = ",".join( | |
[get_save_dir(model_name, finetuning_type, adapter) for adapter in checkpoint_path] | |
) | |
else: # str | |
args["model_name_or_path"] = get_save_dir(model_name, finetuning_type, checkpoint_path) | |
super().__init__(args) | |
yield ALERTS["info_loaded"][lang] | |
def unload_model(self, data) -> Generator[str, None, None]: | |
lang = data[self.manager.get_elem_by_id("top.lang")] | |
if self.demo_mode: | |
gr.Warning(ALERTS["err_demo"][lang]) | |
yield ALERTS["err_demo"][lang] | |
return | |
yield ALERTS["info_unloading"][lang] | |
self.engine = None | |
torch_gc() | |
yield ALERTS["info_unloaded"][lang] | |
def append( | |
self, | |
chatbot: List[List[Optional[str]]], | |
messages: Sequence[Dict[str, str]], | |
role: str, | |
query: str, | |
) -> Tuple[List[List[Optional[str]]], List[Dict[str, str]], str]: | |
return chatbot + [[query, None]], messages + [{"role": role, "content": query}], "" | |
def stream( | |
self, | |
chatbot: List[List[Optional[str]]], | |
messages: Sequence[Dict[str, str]], | |
system: str, | |
tools: str, | |
image: Optional[Any], | |
video: Optional[Any], | |
max_new_tokens: int, | |
top_p: float, | |
temperature: float, | |
) -> Generator[Tuple[List[List[Optional[str]]], List[Dict[str, str]]], None, None]: | |
chatbot[-1][1] = "" | |
response = "" | |
for new_text in self.stream_chat( | |
messages, system, tools, image, video, max_new_tokens=max_new_tokens, top_p=top_p, temperature=temperature | |
): | |
response += new_text | |
if tools: | |
result = self.engine.template.extract_tool(response) | |
else: | |
result = response | |
if isinstance(result, list): | |
tool_calls = [{"name": tool[0], "arguments": json.loads(tool[1])} for tool in result] | |
tool_calls = json.dumps(tool_calls, indent=4, ensure_ascii=False) | |
output_messages = messages + [{"role": Role.FUNCTION.value, "content": tool_calls}] | |
bot_text = "```json\n" + tool_calls + "\n```" | |
else: | |
output_messages = messages + [{"role": Role.ASSISTANT.value, "content": result}] | |
bot_text = result | |
chatbot[-1][1] = bot_text | |
yield chatbot, output_messages | |