Spaces:
Sleeping
Sleeping
File size: 6,212 Bytes
7db0ae4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
#### What this does ####
# On success, logs events to Langfuse
import dotenv, os
import requests
import requests
from datetime import datetime
dotenv.load_dotenv() # Loading env variables using dotenv
import traceback
from packaging.version import Version
class LangFuseLogger:
# Class variables or attributes
def __init__(self):
try:
from langfuse import Langfuse
except Exception as e:
raise Exception(
f"\033[91mLangfuse not installed, try running 'pip install langfuse' to fix this error: {e}\033[0m"
)
# Instance variables
self.secret_key = os.getenv("LANGFUSE_SECRET_KEY")
self.public_key = os.getenv("LANGFUSE_PUBLIC_KEY")
self.langfuse_host = os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com")
self.langfuse_release = os.getenv("LANGFUSE_RELEASE")
self.langfuse_debug = os.getenv("LANGFUSE_DEBUG")
self.Langfuse = Langfuse(
public_key=self.public_key,
secret_key=self.secret_key,
host=self.langfuse_host,
release=self.langfuse_release,
debug=self.langfuse_debug,
)
def log_event(
self, kwargs, response_obj, start_time, end_time, user_id, print_verbose
):
# Method definition
try:
print_verbose(
f"Langfuse Logging - Enters logging function for model {kwargs}"
)
litellm_params = kwargs.get("litellm_params", {})
metadata = (
litellm_params.get("metadata", {}) or {}
) # if litellm_params['metadata'] == None
prompt = [kwargs.get("messages")]
optional_params = kwargs.get("optional_params", {})
optional_params.pop("functions", None)
optional_params.pop("tools", None)
# langfuse only accepts str, int, bool, float for logging
for param, value in optional_params.items():
if not isinstance(value, (str, int, bool, float)):
try:
optional_params[param] = str(value)
except:
# if casting value to str fails don't block logging
pass
# end of processing langfuse ########################
input = prompt
output = response_obj["choices"][0]["message"].json()
print_verbose(
f"OUTPUT IN LANGFUSE: {output}; original: {response_obj['choices'][0]['message']}"
)
self._log_langfuse_v2(
user_id,
metadata,
output,
start_time,
end_time,
kwargs,
optional_params,
input,
response_obj,
) if self._is_langfuse_v2() else self._log_langfuse_v1(
user_id,
metadata,
output,
start_time,
end_time,
kwargs,
optional_params,
input,
response_obj,
)
self.Langfuse.flush()
print_verbose(
f"Langfuse Layer Logging - final response object: {response_obj}"
)
except:
traceback.print_exc()
print_verbose(f"Langfuse Layer Error - {traceback.format_exc()}")
pass
async def _async_log_event(
self, kwargs, response_obj, start_time, end_time, user_id, print_verbose
):
self.log_event(
kwargs, response_obj, start_time, end_time, user_id, print_verbose
)
def _is_langfuse_v2(self):
import langfuse
return Version(langfuse.version.__version__) >= Version("2.0.0")
def _log_langfuse_v1(
self,
user_id,
metadata,
output,
start_time,
end_time,
kwargs,
optional_params,
input,
response_obj,
):
from langfuse.model import CreateTrace, CreateGeneration
print(
"Please upgrade langfuse to v2.0.0 or higher: https://github.com/langfuse/langfuse-python/releases/tag/v2.0.1"
)
trace = self.Langfuse.trace(
CreateTrace(
name=metadata.get("generation_name", "litellm-completion"),
input=input,
output=output,
userId=user_id,
)
)
trace.generation(
CreateGeneration(
name=metadata.get("generation_name", "litellm-completion"),
startTime=start_time,
endTime=end_time,
model=kwargs["model"],
modelParameters=optional_params,
input=input,
output=output,
usage={
"prompt_tokens": response_obj["usage"]["prompt_tokens"],
"completion_tokens": response_obj["usage"]["completion_tokens"],
},
metadata=metadata,
)
)
def _log_langfuse_v2(
self,
user_id,
metadata,
output,
start_time,
end_time,
kwargs,
optional_params,
input,
response_obj,
):
trace = self.Langfuse.trace(
name=metadata.get("generation_name", "litellm-completion"),
input=input,
output=output,
user_id=metadata.get("trace_user_id", user_id),
id=metadata.get("trace_id", None),
)
trace.generation(
name=metadata.get("generation_name", "litellm-completion"),
id=metadata.get("generation_id", None),
startTime=start_time,
endTime=end_time,
model=kwargs["model"],
modelParameters=optional_params,
input=input,
output=output,
usage={
"prompt_tokens": response_obj["usage"]["prompt_tokens"],
"completion_tokens": response_obj["usage"]["completion_tokens"],
},
metadata=metadata,
)
|