Skip to content

(Refactor) Langfuse - remove prepare_metadata, langfuse python SDK now handles non-json serializable objects #7925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 23, 2025
110 changes: 24 additions & 86 deletions litellm/integrations/langfuse/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import copy
import os
import traceback
from collections.abc import MutableMapping, MutableSequence, MutableSet
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast

from packaging.version import Version
from pydantic import BaseModel

import litellm
from litellm._logging import verbose_logger
Expand Down Expand Up @@ -71,8 +69,9 @@ def __init__(
"flush_interval": self.langfuse_flush_interval, # flush interval in seconds
"httpx_client": self.langfuse_client,
}
self.langfuse_sdk_version: str = langfuse.version.__version__

if Version(langfuse.version.__version__) >= Version("2.6.0"):
if Version(self.langfuse_sdk_version) >= Version("2.6.0"):
parameters["sdk_integration"] = "litellm"

self.Langfuse = Langfuse(**parameters)
Expand Down Expand Up @@ -360,73 +359,6 @@ def _log_langfuse_v1(
)
)

def is_base_type(self, value: Any) -> bool:
# Check if the value is of a base type
base_types = (int, float, str, bool, list, dict, tuple)
return isinstance(value, base_types)

def _prepare_metadata(self, metadata: Optional[dict]) -> Any:
try:
if metadata is None:
return None

# Filter out function types from the metadata
sanitized_metadata = {k: v for k, v in metadata.items() if not callable(v)}

return copy.deepcopy(sanitized_metadata)
except Exception as e:
verbose_logger.debug(f"Langfuse Layer Error - {e}, metadata: {metadata}")

new_metadata: Dict[str, Any] = {}

# if metadata is not a MutableMapping, return an empty dict since we can't call items() on it
if not isinstance(metadata, MutableMapping):
verbose_logger.debug(
"Langfuse Layer Logging - metadata is not a MutableMapping, returning empty dict"
)
return new_metadata

for key, value in metadata.items():
try:
if isinstance(value, MutableMapping):
new_metadata[key] = self._prepare_metadata(cast(dict, value))
elif isinstance(value, MutableSequence):
# For lists or other mutable sequences
new_metadata[key] = list(
(
self._prepare_metadata(cast(dict, v))
if isinstance(v, MutableMapping)
else copy.deepcopy(v)
)
for v in value
)
elif isinstance(value, MutableSet):
# For sets specifically, create a new set by passing an iterable
new_metadata[key] = set(
(
self._prepare_metadata(cast(dict, v))
if isinstance(v, MutableMapping)
else copy.deepcopy(v)
)
for v in value
)
elif isinstance(value, BaseModel):
new_metadata[key] = value.model_dump()
elif self.is_base_type(value):
new_metadata[key] = value
else:
verbose_logger.debug(
f"Langfuse Layer Error - Unsupported metadata type: {type(value)} for key: {key}"
)
continue

except (TypeError, copy.Error):
verbose_logger.debug(
f"Langfuse Layer Error - Couldn't copy metadata key: {key}, type of key: {type(key)}, type of value: {type(value)} - {traceback.format_exc()}"
)

return new_metadata

def _log_langfuse_v2( # noqa: PLR0915
self,
user_id,
Expand All @@ -443,27 +375,17 @@ def _log_langfuse_v2( # noqa: PLR0915
print_verbose,
litellm_call_id,
) -> tuple:
import langfuse

verbose_logger.debug("Langfuse Layer Logging - logging to langfuse v2")

try:
metadata = self._prepare_metadata(metadata)

langfuse_version = Version(langfuse.version.__version__)

supports_tags = langfuse_version >= Version("2.6.3")
supports_prompt = langfuse_version >= Version("2.7.3")
supports_costs = langfuse_version >= Version("2.7.3")
supports_completion_start_time = langfuse_version >= Version("2.7.3")

metadata = metadata or {}
standard_logging_object: Optional[StandardLoggingPayload] = cast(
Optional[StandardLoggingPayload],
kwargs.get("standard_logging_object", None),
)
tags = (
self._get_langfuse_tags(standard_logging_object=standard_logging_object)
if supports_tags
if self._supports_tags()
else []
)

Expand Down Expand Up @@ -624,7 +546,7 @@ def _log_langfuse_v2( # noqa: PLR0915
if aws_region_name:
clean_metadata["aws_region_name"] = aws_region_name

if supports_tags:
if self._supports_tags():
if "cache_hit" in kwargs:
if kwargs["cache_hit"] is None:
kwargs["cache_hit"] = False
Expand Down Expand Up @@ -670,7 +592,7 @@ def _log_langfuse_v2( # noqa: PLR0915
usage = {
"prompt_tokens": _usage_obj.prompt_tokens,
"completion_tokens": _usage_obj.completion_tokens,
"total_cost": cost if supports_costs else None,
"total_cost": cost if self._supports_costs() else None,
}
generation_name = clean_metadata.pop("generation_name", None)
if generation_name is None:
Expand Down Expand Up @@ -713,7 +635,7 @@ def _log_langfuse_v2( # noqa: PLR0915
if parent_observation_id is not None:
generation_params["parent_observation_id"] = parent_observation_id

if supports_prompt:
if self._supports_prompt():
generation_params = _add_prompt_to_generation_params(
generation_params=generation_params,
clean_metadata=clean_metadata,
Expand All @@ -723,7 +645,7 @@ def _log_langfuse_v2( # noqa: PLR0915
if output is not None and isinstance(output, str) and level == "ERROR":
generation_params["status_message"] = output

if supports_completion_start_time:
if self._supports_completion_start_time():
generation_params["completion_start_time"] = kwargs.get(
"completion_start_time", None
)
Expand Down Expand Up @@ -770,6 +692,22 @@ def add_default_langfuse_tags(self, tags, kwargs, metadata):
tags.append(f"cache_key:{_cache_key}")
return tags

def _supports_tags(self):
"""Check if current langfuse version supports tags"""
return Version(self.langfuse_sdk_version) >= Version("2.6.3")

def _supports_prompt(self):
"""Check if current langfuse version supports prompt"""
return Version(self.langfuse_sdk_version) >= Version("2.7.3")

def _supports_costs(self):
"""Check if current langfuse version supports costs"""
return Version(self.langfuse_sdk_version) >= Version("2.7.3")

def _supports_completion_start_time(self):
"""Check if current langfuse version supports completion start time"""
return Version(self.langfuse_sdk_version) >= Version("2.7.3")


def _add_prompt_to_generation_params(
generation_params: dict,
Expand Down
3 changes: 3 additions & 0 deletions litellm/integrations/langfuse/langfuse_prompt_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ def __init__(
langfuse_host=None,
flush_interval=1,
):
import langfuse

self.langfuse_sdk_version = langfuse.version.__version__
self.Langfuse = langfuse_client_init(
langfuse_public_key=langfuse_public_key,
langfuse_secret=langfuse_secret,
Expand Down
1 change: 0 additions & 1 deletion tests/code_coverage_tests/recursive_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"text_completion",
"_check_for_os_environ_vars",
"clean_message",
"_prepare_metadata",
"unpack_defs",
"convert_to_nullable",
"add_object_type",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"batch": [
{
"id": "9ee9100b-c4aa-4e40-a10d-bc189f8b4242",
"type": "trace-create",
"body": {
"id": "litellm-test-c414db10-dd68-406e-9d9e-03839bc2f346",
"timestamp": "2025-01-22T17:27:51.702596Z",
"name": "litellm-acompletion",
"input": {
"messages": [
{
"role": "user",
"content": "Hello!"
}
]
},
"output": {
"content": "Hello! How can I assist you today?",
"role": "assistant",
"tool_calls": null,
"function_call": null
},
"tags": []
},
"timestamp": "2025-01-22T17:27:51.702716Z"
},
{
"id": "f8d20489-ed58-429f-b609-87380e223746",
"type": "generation-create",
"body": {
"traceId": "litellm-test-c414db10-dd68-406e-9d9e-03839bc2f346",
"name": "litellm-acompletion",
"startTime": "2025-01-22T09:27:51.150898-08:00",
"metadata": {
"string_value": "hello",
"int_value": 42,
"float_value": 3.14,
"bool_value": true,
"nested_dict": {
"key1": "value1",
"key2": {
"inner_key": "inner_value"
}
},
"list_value": [
1,
2,
3
],
"set_value": [
1,
2,
3
],
"complex_list": [
{
"dict_in_list": "value"
},
"simple_string",
[
1,
2,
3
]
],
"user": {
"name": "John",
"age": 30,
"tags": [
"customer",
"active"
]
},
"hidden_params": {
"model_id": null,
"cache_key": null,
"api_base": "https://api.openai.com",
"response_cost": 5.4999999999999995e-05,
"additional_headers": {},
"litellm_overhead_time_ms": null
},
"litellm_response_cost": 5.4999999999999995e-05,
"cache_hit": false,
"requester_metadata": {}
},
"input": {
"messages": [
{
"role": "user",
"content": "Hello!"
}
]
},
"output": {
"content": "Hello! How can I assist you today?",
"role": "assistant",
"tool_calls": null,
"function_call": null
},
"level": "DEFAULT",
"id": "time-09-27-51-150898_chatcmpl-b783291c-dc76-4660-bfef-b79be9d54e57",
"endTime": "2025-01-22T09:27:51.702048-08:00",
"completionStartTime": "2025-01-22T09:27:51.702048-08:00",
"model": "gpt-3.5-turbo",
"modelParameters": {
"extra_body": "{}"
},
"usage": {
"input": 10,
"output": 20,
"unit": "TOKENS",
"totalCost": 5.4999999999999995e-05
}
},
"timestamp": "2025-01-22T17:27:51.703046Z"
}
],
"metadata": {
"batch_size": 2,
"sdk_integration": "litellm",
"sdk_name": "python",
"sdk_version": "2.44.1",
"public_key": "pk-lf-e02aaea3-8668-4c9f-8c69-771a4ea1f5c9"
}
}
Loading
Loading