litellmlope / litellm /budget_manager.py
ka1kuk's picture
Upload 235 files
7db0ae4 verified
raw
history blame
7.8 kB
import os, json, time
import litellm
from litellm.utils import ModelResponse
import requests, threading
from typing import Optional, Union, Literal
class BudgetManager:
def __init__(
self,
project_name: str,
client_type: str = "local",
api_base: Optional[str] = None,
):
self.client_type = client_type
self.project_name = project_name
self.api_base = api_base or "https://api.litellm.ai"
## load the data or init the initial dictionaries
self.load_data()
def print_verbose(self, print_statement):
try:
if litellm.set_verbose:
import logging
logging.info(print_statement)
except:
pass
def load_data(self):
if self.client_type == "local":
# Check if user dict file exists
if os.path.isfile("user_cost.json"):
# Load the user dict
with open("user_cost.json", "r") as json_file:
self.user_dict = json.load(json_file)
else:
self.print_verbose("User Dictionary not found!")
self.user_dict = {}
self.print_verbose(f"user dict from local: {self.user_dict}")
elif self.client_type == "hosted":
# Load the user_dict from hosted db
url = self.api_base + "/get_budget"
headers = {"Content-Type": "application/json"}
data = {"project_name": self.project_name}
response = requests.post(url, headers=headers, json=data)
response = response.json()
if response["status"] == "error":
self.user_dict = (
{}
) # assume this means the user dict hasn't been stored yet
else:
self.user_dict = response["data"]
def create_budget(
self,
total_budget: float,
user: str,
duration: Optional[Literal["daily", "weekly", "monthly", "yearly"]] = None,
created_at: float = time.time(),
):
self.user_dict[user] = {"total_budget": total_budget}
if duration is None:
return self.user_dict[user]
if duration == "daily":
duration_in_days = 1
elif duration == "weekly":
duration_in_days = 7
elif duration == "monthly":
duration_in_days = 28
elif duration == "yearly":
duration_in_days = 365
else:
raise ValueError(
"""duration needs to be one of ["daily", "weekly", "monthly", "yearly"]"""
)
self.user_dict[user] = {
"total_budget": total_budget,
"duration": duration_in_days,
"created_at": created_at,
"last_updated_at": created_at,
}
self._save_data_thread() # [Non-Blocking] Update persistent storage without blocking execution
return self.user_dict[user]
def projected_cost(self, model: str, messages: list, user: str):
text = "".join(message["content"] for message in messages)
prompt_tokens = litellm.token_counter(model=model, text=text)
prompt_cost, _ = litellm.cost_per_token(
model=model, prompt_tokens=prompt_tokens, completion_tokens=0
)
current_cost = self.user_dict[user].get("current_cost", 0)
projected_cost = prompt_cost + current_cost
return projected_cost
def get_total_budget(self, user: str):
return self.user_dict[user]["total_budget"]
def update_cost(
self,
user: str,
completion_obj: Optional[ModelResponse] = None,
model: Optional[str] = None,
input_text: Optional[str] = None,
output_text: Optional[str] = None,
):
if model and input_text and output_text:
prompt_tokens = litellm.token_counter(
model=model, messages=[{"role": "user", "content": input_text}]
)
completion_tokens = litellm.token_counter(
model=model, messages=[{"role": "user", "content": output_text}]
)
(
prompt_tokens_cost_usd_dollar,
completion_tokens_cost_usd_dollar,
) = litellm.cost_per_token(
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
)
cost = prompt_tokens_cost_usd_dollar + completion_tokens_cost_usd_dollar
elif completion_obj:
cost = litellm.completion_cost(completion_response=completion_obj)
model = completion_obj[
"model"
] # if this throws an error try, model = completion_obj['model']
else:
raise ValueError(
"Either a chat completion object or the text response needs to be passed in. Learn more - https://docs.litellm.ai/docs/budget_manager"
)
self.user_dict[user]["current_cost"] = cost + self.user_dict[user].get(
"current_cost", 0
)
if "model_cost" in self.user_dict[user]:
self.user_dict[user]["model_cost"][model] = cost + self.user_dict[user][
"model_cost"
].get(model, 0)
else:
self.user_dict[user]["model_cost"] = {model: cost}
self._save_data_thread() # [Non-Blocking] Update persistent storage without blocking execution
return {"user": self.user_dict[user]}
def get_current_cost(self, user):
return self.user_dict[user].get("current_cost", 0)
def get_model_cost(self, user):
return self.user_dict[user].get("model_cost", 0)
def is_valid_user(self, user: str) -> bool:
return user in self.user_dict
def get_users(self):
return list(self.user_dict.keys())
def reset_cost(self, user):
self.user_dict[user]["current_cost"] = 0
self.user_dict[user]["model_cost"] = {}
return {"user": self.user_dict[user]}
def reset_on_duration(self, user: str):
# Get current and creation time
last_updated_at = self.user_dict[user]["last_updated_at"]
current_time = time.time()
# Convert duration from days to seconds
duration_in_seconds = self.user_dict[user]["duration"] * 24 * 60 * 60
# Check if duration has elapsed
if current_time - last_updated_at >= duration_in_seconds:
# Reset cost if duration has elapsed and update the creation time
self.reset_cost(user)
self.user_dict[user]["last_updated_at"] = current_time
self._save_data_thread() # Save the data
def update_budget_all_users(self):
for user in self.get_users():
if "duration" in self.user_dict[user]:
self.reset_on_duration(user)
def _save_data_thread(self):
thread = threading.Thread(
target=self.save_data
) # [Non-Blocking]: saves data without blocking execution
thread.start()
def save_data(self):
if self.client_type == "local":
import json
# save the user dict
with open("user_cost.json", "w") as json_file:
json.dump(
self.user_dict, json_file, indent=4
) # Indent for pretty formatting
return {"status": "success"}
elif self.client_type == "hosted":
url = self.api_base + "/set_budget"
headers = {"Content-Type": "application/json"}
data = {"project_name": self.project_name, "user_dict": self.user_dict}
response = requests.post(url, headers=headers, json=data)
response = response.json()
return response