# -*- coding: utf-8 -*- # Copyright (c) 2024 OSU Natural Language Processing Group # # Licensed under the OpenRAIL-S License; # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.licenses.ai/ai-pubs-open-rails-vz1 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import time import backoff import openai from openai import ( APIConnectionError, APIError, RateLimitError, ) import requests from dotenv import load_dotenv import litellm import base64 EMPTY_API_KEY="Your API KEY Here" def load_openai_api_key(): load_dotenv() assert ( os.getenv("OPENAI_API_KEY") is not None and os.getenv("OPENAI_API_KEY") != EMPTY_API_KEY ), "must pass on the api_key or set OPENAI_API_KEY in the environment" return os.getenv("OPENAI_API_KEY") def load_gemini_api_key(): load_dotenv() assert ( os.getenv("GEMINI_API_KEY") is not None and os.getenv("GEMINI_API_KEY") != EMPTY_API_KEY ), "must pass on the api_key or set GEMINI_API_KEY in the environment" return os.getenv("GEMINI_API_KEY") def encode_image(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') def engine_factory(api_key=None, model=None, **kwargs): model = model.lower() if model in ["gpt-4-vision-preview", "gpt-4-turbo", "gpt-4o"]: if api_key and api_key != EMPTY_API_KEY: os.environ["OPENAI_API_KEY"] = api_key else: load_openai_api_key() return OpenAIEngine(model=model, **kwargs) elif model in ["gemini-1.5-pro-latest", "gemini-1.5-flash"]: if api_key and api_key != EMPTY_API_KEY: os.environ["GEMINI_API_KEY"] = api_key else: load_gemini_api_key() model=f"gemini/{model}" return GeminiEngine(model=model, **kwargs) elif model == "llava": model="llava" return OllamaEngine(model=model, **kwargs) raise Exception(f"Unsupported model: {model}, currently supported models: \ gpt-4-vision-preview, gpt-4-turbo, gemini-1.5-pro-latest, llava") class Engine: def __init__( self, stop=["\n\n"], rate_limit=-1, model=None, temperature=0, **kwargs, ) -> None: """ Base class to init an engine Args: api_key (_type_, optional): Auth key from OpenAI. Defaults to None. stop (list, optional): Tokens indicate stop of sequence. Defaults to ["\n"]. rate_limit (int, optional): Max number of requests per minute. Defaults to -1. model (_type_, optional): Model family. Defaults to None. """ self.time_slots = [0] self.stop = stop self.temperature = temperature self.model = model # convert rate limit to minmum request interval self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit self.next_avil_time = [0] * len(self.time_slots) self.current_key_idx = 0 print(f"Initializing model {self.model}") def tokenize(self, input): return self.tokenizer(input) class OllamaEngine(Engine): def __init__(self, **kwargs) -> None: """ Init an Ollama engine To use Ollama, dowload and install Ollama from https://ollama.com/ After Ollama start, pull llava with command: ollama pull llava """ super().__init__(**kwargs) self.api_url = "http://localhost:11434/api/chat" def generate(self, prompt: list = None, max_new_tokens=4096, temperature=None, model=None, image_path=None, ouput_0=None, turn_number=0, **kwargs): self.current_key_idx = (self.current_key_idx + 1) % len(self.time_slots) start_time = time.time() if ( self.request_interval > 0 and start_time < self.next_avil_time[self.current_key_idx] ): wait_time = self.next_avil_time[self.current_key_idx] - start_time print(f"Wait {wait_time} for rate limitting") time.sleep(wait_time) prompt0, prompt1, prompt2 = prompt base64_image = encode_image(image_path) if turn_number == 0: # Assume one turn dialogue prompt_input = [ {"role": "assistant", "content": prompt0}, {"role": "user", "content": prompt1, "images": [f"{base64_image}"]}, ] elif turn_number == 1: prompt_input = [ {"role": "assistant", "content": prompt0}, {"role": "user", "content": prompt1, "images": [f"{base64_image}"]}, {"role": "assistant", "content": f"\n\n{ouput_0}"}, {"role": "user", "content": prompt2}, ] options = {"temperature": self.temperature, "num_predict": max_new_tokens} data = { "model": self.model, "messages": prompt_input, "options": options, "stream": False, } _request = { "url": f"{self.api_url}", "json": data, } response = requests.post(**_request) # type: ignore if response.status_code != 200: raise Exception(f"Ollama API Error: {response.status_code}, {response.text}") response_json = response.json() return response_json["message"]["content"] class GeminiEngine(Engine): def __init__(self, **kwargs) -> None: """ Init a Gemini engine To use this engine, please provide the GEMINI_API_KEY in the environment Supported Model Rate Limit gemini-1.5-pro-latest 2 queries per minute, 1000 queries per day """ super().__init__(**kwargs) def generate(self, prompt: list = None, max_new_tokens=4096, temperature=None, model=None, image_path=None, ouput_0=None, turn_number=0, **kwargs): self.current_key_idx = (self.current_key_idx + 1) % len(self.time_slots) start_time = time.time() if ( self.request_interval > 0 and start_time < self.next_avil_time[self.current_key_idx] ): wait_time = self.next_avil_time[self.current_key_idx] - start_time print(f"Wait {wait_time} for rate limitting") prompt0, prompt1, prompt2 = prompt litellm.set_verbose=True base64_image = encode_image(image_path) if turn_number == 0: # Assume one turn dialogue prompt_input = [ {"role": "system", "content": prompt0}, {"role": "user", "content": [{"type": "text", "text": prompt1}, {"type": "image_url", "image_url": {"url": image_path, "detail": "high"}, }]}, ] elif turn_number == 1: prompt_input = [ {"role": "system", "content": prompt0}, {"role": "user", "content": [{"type": "text", "text": prompt1}, {"type": "image_url", "image_url": {"url": image_path, "detail": "high"}, }]}, {"role": "assistant", "content": [{"type": "text", "text": f"\n\n{ouput_0}"}]}, {"role": "user", "content": [{"type": "text", "text": prompt2}]}, ] response = litellm.completion( model=model if model else self.model, messages=prompt_input, max_tokens=max_new_tokens if max_new_tokens else 4096, temperature=temperature if temperature else self.temperature, **kwargs, ) return [choice["message"]["content"] for choice in response.choices][0] class OpenAIEngine(Engine): def __init__(self, **kwargs) -> None: """ Init an OpenAI GPT/Codex engine To find your OpenAI API key, visit https://platform.openai.com/api-keys """ super().__init__(**kwargs) @backoff.on_exception( backoff.expo, (APIError, RateLimitError, APIConnectionError), ) def generate(self, prompt: list = None, max_new_tokens=4096, temperature=None, model=None, image_path=None, ouput_0=None, turn_number=0, **kwargs): self.current_key_idx = (self.current_key_idx + 1) % len(self.time_slots) start_time = time.time() if ( self.request_interval > 0 and start_time < self.next_avil_time[self.current_key_idx] ): time.sleep(self.next_avil_time[self.current_key_idx] - start_time) prompt0, prompt1, prompt2 = prompt # litellm.set_verbose=True base64_image = encode_image(image_path) if turn_number == 0: # Assume one turn dialogue prompt_input = [ {"role": "system", "content": [{"type": "text", "text": prompt0}]}, {"role": "user", "content": [{"type": "text", "text": prompt1}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}", "detail": "high"}, }]}, ] elif turn_number == 1: prompt_input = [ {"role": "system", "content": [{"type": "text", "text": prompt0}]}, {"role": "user", "content": [{"type": "text", "text": prompt1}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}", "detail": "high"}, }]}, {"role": "assistant", "content": [{"type": "text", "text": f"\n\n{ouput_0}"}]}, {"role": "user", "content": [{"type": "text", "text": prompt2}]}, ] response = litellm.completion( model=model if model else self.model, messages=prompt_input, max_tokens=max_new_tokens if max_new_tokens else 4096, temperature=temperature if temperature else self.temperature, **kwargs, ) return [choice["message"]["content"] for choice in response.choices][0] class OpenaiEngine_MindAct(Engine): def __init__(self, **kwargs) -> None: """Init an OpenAI GPT/Codex engine Args: api_key (_type_, optional): Auth key from OpenAI. Defaults to None. stop (list, optional): Tokens indicate stop of sequence. Defaults to ["\n"]. rate_limit (int, optional): Max number of requests per minute. Defaults to -1. model (_type_, optional): Model family. Defaults to None. """ super().__init__(**kwargs) # @backoff.on_exception( backoff.expo, (APIError, RateLimitError, APIConnectionError), ) def generate(self, prompt, max_new_tokens=50, temperature=0, model=None, **kwargs): self.current_key_idx = (self.current_key_idx + 1) % len(self.time_slots) start_time = time.time() if ( self.request_interval > 0 and start_time < self.next_avil_time[self.current_key_idx] ): time.sleep(self.next_avil_time[self.current_key_idx] - start_time) if isinstance(prompt, str): # Assume one turn dialogue prompt = [ {"role": "user", "content": prompt}, ] response = litellm.completion( model=model if model else self.model, messages=prompt, max_tokens=max_new_tokens, temperature=temperature, **kwargs, ) if self.request_interval > 0: self.next_avil_time[self.current_key_idx] = ( max(start_time, self.next_avil_time[self.current_key_idx]) + self.request_interval ) return [choice["message"]["content"] for choice in response["choices"]]