Spaces:
Sleeping
Sleeping
File size: 7,799 Bytes
7db0ae4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
import os, json, time
import litellm
from litellm.utils import ModelResponse
import requests, threading
from typing import Optional, Union, Literal
class BudgetManager:
def __init__(
self,
project_name: str,
client_type: str = "local",
api_base: Optional[str] = None,
):
self.client_type = client_type
self.project_name = project_name
self.api_base = api_base or "https://api.litellm.ai"
## load the data or init the initial dictionaries
self.load_data()
def print_verbose(self, print_statement):
try:
if litellm.set_verbose:
import logging
logging.info(print_statement)
except:
pass
def load_data(self):
if self.client_type == "local":
# Check if user dict file exists
if os.path.isfile("user_cost.json"):
# Load the user dict
with open("user_cost.json", "r") as json_file:
self.user_dict = json.load(json_file)
else:
self.print_verbose("User Dictionary not found!")
self.user_dict = {}
self.print_verbose(f"user dict from local: {self.user_dict}")
elif self.client_type == "hosted":
# Load the user_dict from hosted db
url = self.api_base + "/get_budget"
headers = {"Content-Type": "application/json"}
data = {"project_name": self.project_name}
response = requests.post(url, headers=headers, json=data)
response = response.json()
if response["status"] == "error":
self.user_dict = (
{}
) # assume this means the user dict hasn't been stored yet
else:
self.user_dict = response["data"]
def create_budget(
self,
total_budget: float,
user: str,
duration: Optional[Literal["daily", "weekly", "monthly", "yearly"]] = None,
created_at: float = time.time(),
):
self.user_dict[user] = {"total_budget": total_budget}
if duration is None:
return self.user_dict[user]
if duration == "daily":
duration_in_days = 1
elif duration == "weekly":
duration_in_days = 7
elif duration == "monthly":
duration_in_days = 28
elif duration == "yearly":
duration_in_days = 365
else:
raise ValueError(
"""duration needs to be one of ["daily", "weekly", "monthly", "yearly"]"""
)
self.user_dict[user] = {
"total_budget": total_budget,
"duration": duration_in_days,
"created_at": created_at,
"last_updated_at": created_at,
}
self._save_data_thread() # [Non-Blocking] Update persistent storage without blocking execution
return self.user_dict[user]
def projected_cost(self, model: str, messages: list, user: str):
text = "".join(message["content"] for message in messages)
prompt_tokens = litellm.token_counter(model=model, text=text)
prompt_cost, _ = litellm.cost_per_token(
model=model, prompt_tokens=prompt_tokens, completion_tokens=0
)
current_cost = self.user_dict[user].get("current_cost", 0)
projected_cost = prompt_cost + current_cost
return projected_cost
def get_total_budget(self, user: str):
return self.user_dict[user]["total_budget"]
def update_cost(
self,
user: str,
completion_obj: Optional[ModelResponse] = None,
model: Optional[str] = None,
input_text: Optional[str] = None,
output_text: Optional[str] = None,
):
if model and input_text and output_text:
prompt_tokens = litellm.token_counter(
model=model, messages=[{"role": "user", "content": input_text}]
)
completion_tokens = litellm.token_counter(
model=model, messages=[{"role": "user", "content": output_text}]
)
(
prompt_tokens_cost_usd_dollar,
completion_tokens_cost_usd_dollar,
) = litellm.cost_per_token(
model=model,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
)
cost = prompt_tokens_cost_usd_dollar + completion_tokens_cost_usd_dollar
elif completion_obj:
cost = litellm.completion_cost(completion_response=completion_obj)
model = completion_obj[
"model"
] # if this throws an error try, model = completion_obj['model']
else:
raise ValueError(
"Either a chat completion object or the text response needs to be passed in. Learn more - https://docs.litellm.ai/docs/budget_manager"
)
self.user_dict[user]["current_cost"] = cost + self.user_dict[user].get(
"current_cost", 0
)
if "model_cost" in self.user_dict[user]:
self.user_dict[user]["model_cost"][model] = cost + self.user_dict[user][
"model_cost"
].get(model, 0)
else:
self.user_dict[user]["model_cost"] = {model: cost}
self._save_data_thread() # [Non-Blocking] Update persistent storage without blocking execution
return {"user": self.user_dict[user]}
def get_current_cost(self, user):
return self.user_dict[user].get("current_cost", 0)
def get_model_cost(self, user):
return self.user_dict[user].get("model_cost", 0)
def is_valid_user(self, user: str) -> bool:
return user in self.user_dict
def get_users(self):
return list(self.user_dict.keys())
def reset_cost(self, user):
self.user_dict[user]["current_cost"] = 0
self.user_dict[user]["model_cost"] = {}
return {"user": self.user_dict[user]}
def reset_on_duration(self, user: str):
# Get current and creation time
last_updated_at = self.user_dict[user]["last_updated_at"]
current_time = time.time()
# Convert duration from days to seconds
duration_in_seconds = self.user_dict[user]["duration"] * 24 * 60 * 60
# Check if duration has elapsed
if current_time - last_updated_at >= duration_in_seconds:
# Reset cost if duration has elapsed and update the creation time
self.reset_cost(user)
self.user_dict[user]["last_updated_at"] = current_time
self._save_data_thread() # Save the data
def update_budget_all_users(self):
for user in self.get_users():
if "duration" in self.user_dict[user]:
self.reset_on_duration(user)
def _save_data_thread(self):
thread = threading.Thread(
target=self.save_data
) # [Non-Blocking]: saves data without blocking execution
thread.start()
def save_data(self):
if self.client_type == "local":
import json
# save the user dict
with open("user_cost.json", "w") as json_file:
json.dump(
self.user_dict, json_file, indent=4
) # Indent for pretty formatting
return {"status": "success"}
elif self.client_type == "hosted":
url = self.api_base + "/set_budget"
headers = {"Content-Type": "application/json"}
data = {"project_name": self.project_name, "user_dict": self.user_dict}
response = requests.post(url, headers=headers, json=data)
response = response.json()
return response
|