Skip to content

Clement/eng 2181 otel exporter #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions literalai/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import json
import os
from traceloop.sdk import Traceloop
from typing import Any, Dict, List, Optional, Union
from typing_extensions import deprecated
import io
from contextlib import redirect_stdout

from literalai.api import AsyncLiteralAPI, LiteralAPI
from literalai.callback.langchain_callback import get_langchain_callback
Expand All @@ -10,6 +15,7 @@
experiment_item_run_decorator,
)
from literalai.event_processor import EventProcessor
from literalai.exporter import LoggingSpanExporter
from literalai.instrumentation.mistralai import instrument_mistralai
from literalai.instrumentation.openai import instrument_openai
from literalai.my_types import Environment
Expand Down Expand Up @@ -92,18 +98,21 @@ def to_sync(self) -> "LiteralClient":
else:
return self # type: ignore

@deprecated("Use Literal.initialize instead")
def instrument_openai(self):
"""
Instruments the OpenAI SDK so that all LLM calls are logged to Literal AI.
"""
instrument_openai(self.to_sync())

@deprecated("Use Literal.initialize instead")
def instrument_mistralai(self):
"""
Instruments the Mistral AI SDK so that all LLM calls are logged to Literal AI.
"""
instrument_mistralai(self.to_sync())

@deprecated("Use Literal.initialize instead")
def instrument_llamaindex(self):
"""
Instruments the Llama Index framework so that all RAG & LLM calls are logged to Literal AI.
Expand All @@ -119,6 +128,13 @@ def instrument_llamaindex(self):

instrument_llamaindex(self.to_sync())

def initialize(self):
with redirect_stdout(io.StringIO()):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Traceloop is verbose so this is aimed to prevent explicit print statement.

Traceloop.init(
disable_batch=True,
exporter=LoggingSpanExporter(event_processor=self.event_processor),
)

def langchain_callback(
self,
to_ignore: Optional[List[str]] = None,
Expand Down Expand Up @@ -352,6 +368,27 @@ def get_current_root_run(self):
"""
return active_root_run_var.get()

def set_properties(
self,
name: Optional[str] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
):
thread = active_thread_var.get()
root_run = active_root_run_var.get()
parent = active_steps_var.get()[-1] if active_steps_var.get() else None

Traceloop.set_association_properties(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not an upsert so we need to maintain explicit calls.

{
"literal.thread_id": str(thread.id) if thread else "None",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can only store strings

"literal.parent_id": str(parent.id) if parent else "None",
"literal.root_run_id": str(root_run.id) if root_run else "None",
"literal.name": str(name) if name else "None",
"literal.tags": json.dumps(tags) if tags else "None",
"literal.metadata": json.dumps(metadata) if metadata else "None",
}
)

def reset_context(self):
"""
Resets the context, forgetting active steps & setting current thread to None.
Expand Down
1 change: 1 addition & 0 deletions literalai/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def _process_batch(self, batch: List):
self.processing_counter -= len(batch)

def flush_and_stop(self):
time.sleep(4)
self.stop_event.set()
if not self.disabled:
self.processing_thread.join()
Expand Down
209 changes: 209 additions & 0 deletions literalai/exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
from datetime import datetime, timezone
import json
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from typing import Dict, List, Optional, Sequence, cast
import logging


from literalai.event_processor import EventProcessor
from literalai.helper import utc_now
from literalai.observability.generation import GenerationType
from literalai.observability.step import Step, StepDict


class LoggingSpanExporter(SpanExporter):
def __init__(
self,
logger_name: str = "span_exporter",
event_processor: Optional[EventProcessor] = None,
):
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
self.event_processor = event_processor

if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
"""Export the spans by logging them."""
try:
for span in spans:
if (
span.attributes
and span.attributes.get("gen_ai.request.model", None) is not None
and self.event_processor is not None
):
step = self._create_step_from_span(span)
self.event_processor.add_event(cast(StepDict, step.to_dict()))

return SpanExportResult.SUCCESS
except Exception as e:
self.logger.error(f"Failed to export spans: {e}")
return SpanExportResult.FAILURE

def shutdown(self):
"""Shuts down the exporter."""
if self.event_processor is not None:
return self.event_processor.flush_and_stop()

def force_flush(self, timeout_millis: float = 30000) -> bool:
"""Force flush the exporter."""
return True

# # TODO: Add generation promptid
# # TODO: Add generation variables
# # TODO: Check missing variables
# # TODO: ttFirstToken
# # TODO: duration
# # TODO: tokenThroughputInSeconds
# # TODO: Add tools
# # TODO: error check with gemini error
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't found a proper way to handle those as of now:

  • promptid and variables could be set manually?
  • missing variables like frequency_penalty are just not tracked by Traceloop so they do not appear in the span attributes, could only found basic support in their issue.
  • ttFirstToken, duration and tokenThroughputInSeconds could not find a clear alternative
  • tools to test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could not find a way to handle gemini and ttFirstToken

def _create_step_from_span(self, span: ReadableSpan) -> Step:
"""Convert a span to a Step object"""
attributes = span.attributes or {}

start_time = (
datetime.fromtimestamp(span.start_time / 1e9, tz=timezone.utc).isoformat()
if span.start_time
else utc_now()
)
end_time = (
datetime.fromtimestamp(span.end_time / 1e9, tz=timezone.utc).isoformat()
if span.end_time
else utc_now()
)

generation_type = attributes.get("llm.request.type")
is_chat = generation_type == "chat"

span_props = {
"parent_id": attributes.get(
"traceloop.association.properties.literal.parent_id"
),
"thread_id": attributes.get(
"traceloop.association.properties.literal.thread_id"
),
"root_run_id": attributes.get(
"traceloop.association.properties.literal.root_run_id"
),
"metadata": attributes.get(
"traceloop.association.properties.literal.metadata"
),
"tags": attributes.get("traceloop.association.properties.literal.tags"),
"name": attributes.get("traceloop.association.properties.literal.name"),
}

span_props = {
k: str(v) for k, v in span_props.items() if v is not None and v != "None"
}

generation_content = {
"messages": (
self.extract_messages(cast(Dict, attributes)) if is_chat else None
),
"message_completion": (
self.extract_messages(cast(Dict, attributes), "gen_ai.completion.")[0]
if is_chat
else None
),
"prompt": attributes.get("gen_ai.prompt.0.user"),
"completion": attributes.get("gen_ai.completion.0.content"),
"model": attributes.get("gen_ai.request.model"),
"provider": attributes.get("gen_ai.system"),
}
generation_settings = {
"max_tokens": attributes.get("gen_ai.request.max_tokens"),
"stream": attributes.get("llm.is_streaming"),
"token_count": attributes.get("llm.usage.total_tokens"),
"input_token_count": attributes.get("gen_ai.usage.prompt_tokens"),
"output_token_count": attributes.get("gen_ai.usage.completion_tokens"),
"frequency_penalty": attributes.get("gen_ai.request.frequency_penalty"),
"presence_penalty": attributes.get("gen_ai.request.presence_penalty"),
"temperature": attributes.get("gen_ai.request.temperature"),
"top_p": attributes.get("gen_ai.request.top_p"),
}

step_dict = {
"id": str(span.context.span_id) if span.context else None,
"name": span_props.get("name", span.name),
"type": "llm",
"metadata": self.extract_json(span_props.get("metadata", "{}")),
"startTime": start_time,
"endTime": end_time,
"threadId": span_props.get("thread_id"),
"parentId": span_props.get("parent_id"),
"rootRunId": span_props.get("root_run_id"),
"tags": self.extract_json(span_props.get("tags", "[]")),
"input": {
"content": (
generation_content["messages"]
if is_chat
else generation_content["prompt"]
)
},
"output": {
"content": (
generation_content["message_completion"]
if is_chat
else generation_content["completion"]
)
},
"generation": {
"type": GenerationType.CHAT if is_chat else GenerationType.COMPLETION,
"prompt": generation_content["prompt"] if not is_chat else None,
"completion": generation_content["completion"] if not is_chat else None,
"model": generation_content["model"],
"provider": generation_content["provider"],
"settings": generation_settings,
"tokenCount": generation_settings["token_count"],
"inputTokenCount": generation_settings["input_token_count"],
"outputTokenCount": generation_settings["output_token_count"],
"messages": generation_content["messages"],
"messageCompletion": generation_content["message_completion"],
},
}

step = Step.from_dict(cast(StepDict, step_dict))

if not span.status.is_ok:
step.error = span.status.description or "Unknown error"

return step

def extract_messages(
self, data: Dict, prefix: str = "gen_ai.prompt."
) -> List[Dict]:
messages = []
index = 0

while True:
role_key = f"{prefix}{index}.role"
content_key = f"{prefix}{index}.content"

if role_key not in data or content_key not in data:
break

messages.append(
{
"role": data[role_key],
"content": self.extract_json(data[content_key]),
}
)

index += 1

return messages

def extract_json(self, data: str) -> Dict | List | str:
try:
content = json.loads(data)
except Exception:
content = data

return content
7 changes: 6 additions & 1 deletion literalai/instrumentation/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@

from literalai.context import active_steps_var, active_thread_var
from literalai.helper import ensure_values_serializable
from literalai.observability.generation import GenerationMessage, CompletionGeneration, ChatGeneration, GenerationType
from literalai.observability.generation import (
GenerationMessage,
CompletionGeneration,
ChatGeneration,
GenerationType,
)
from literalai.wrappers import AfterContext, BeforeContext, wrap_all

logger = logging.getLogger(__name__)
Expand Down
37 changes: 35 additions & 2 deletions literalai/observability/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from pydantic import Field
from pydantic.dataclasses import dataclass
from traceloop.sdk import Traceloop
from typing_extensions import TypedDict

if TYPE_CHECKING:
Expand Down Expand Up @@ -69,7 +70,7 @@ class AttachmentDict(TypedDict, total=False):
@dataclass(repr=False)
class Score(Utils):
"""
A score captures information about the quality of a step/experiment item.
A score captures information about the quality of a step/experiment item.
It can be of type either:
- HUMAN: to capture human feedback
- CODE: to capture the result of a code execution (deterministic)
Expand Down Expand Up @@ -127,9 +128,10 @@ def from_dict(cls, score_dict: ScoreDict) -> "Score":
@dataclass(repr=False)
class Attachment(Utils):
"""
An attachment is an object that can be associated with a step.
An attachment is an object that can be associated with a step.
It can be an image, a file, a video, etc.
"""

step_id: Optional[str] = None
thread_id: Optional[str] = None
id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()))
Expand Down Expand Up @@ -522,6 +524,21 @@ async def __aenter__(self):

if active_root_run_var.get() is None and self.step_type == "run":
active_root_run_var.set(self.step)
Traceloop.set_association_properties(
{
"literal.thread_id": str(self.step.thread_id),
"literal.parent_id": self.step.id,
"literal.root_run_id": str(self.step.id),
}
)
else:
Traceloop.set_association_properties(
{
"literal.thread_id": str(self.thread_id),
"literal.parent_id": self.step.id,
"literal.root_run_id": str(self.step.root_run_id),
}
)

return self.step

Expand Down Expand Up @@ -549,6 +566,21 @@ def __enter__(self) -> Step:

if active_root_run_var.get() is None and self.step_type == "run":
active_root_run_var.set(self.step)
Traceloop.set_association_properties(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if otel is not initialized this wont be an issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it can be safely called even without initialization 👍

{
"literal.thread_id": str(self.step.thread_id),
"literal.parent_id": self.step.id,
"literal.root_run_id": str(self.step.id),
}
)
else:
Traceloop.set_association_properties(
{
"literal.thread_id": str(self.thread_id),
"literal.parent_id": self.step.id,
"literal.root_run_id": str(self.step.root_run_id),
}
)

return self.step

Expand Down Expand Up @@ -637,6 +669,7 @@ def sync_wrapper(*args, **kwargs):
step.output = {"content": deepcopy(result)}
except Exception:
pass

return result

return sync_wrapper
Loading
Loading