-
Notifications
You must be signed in to change notification settings - Fork 10
Clement/eng 2181 otel exporter #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
975359d
ae005e3
70c6e8a
95cdb03
b63e912
e09d261
1cdace9
ff7f64a
8178a27
f381baf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,10 @@ | ||
import json | ||
import os | ||
from traceloop.sdk import Traceloop | ||
from typing import Any, Dict, List, Optional, Union | ||
from typing_extensions import deprecated | ||
import io | ||
from contextlib import redirect_stdout | ||
|
||
from literalai.api import AsyncLiteralAPI, LiteralAPI | ||
from literalai.callback.langchain_callback import get_langchain_callback | ||
|
@@ -10,6 +15,7 @@ | |
experiment_item_run_decorator, | ||
) | ||
from literalai.event_processor import EventProcessor | ||
from literalai.exporter import LoggingSpanExporter | ||
from literalai.instrumentation.mistralai import instrument_mistralai | ||
from literalai.instrumentation.openai import instrument_openai | ||
from literalai.my_types import Environment | ||
|
@@ -92,18 +98,21 @@ def to_sync(self) -> "LiteralClient": | |
else: | ||
return self # type: ignore | ||
|
||
@deprecated("Use Literal.initialize instead") | ||
def instrument_openai(self): | ||
""" | ||
Instruments the OpenAI SDK so that all LLM calls are logged to Literal AI. | ||
""" | ||
instrument_openai(self.to_sync()) | ||
|
||
@deprecated("Use Literal.initialize instead") | ||
def instrument_mistralai(self): | ||
""" | ||
Instruments the Mistral AI SDK so that all LLM calls are logged to Literal AI. | ||
""" | ||
instrument_mistralai(self.to_sync()) | ||
|
||
@deprecated("Use Literal.initialize instead") | ||
def instrument_llamaindex(self): | ||
""" | ||
Instruments the Llama Index framework so that all RAG & LLM calls are logged to Literal AI. | ||
|
@@ -119,6 +128,13 @@ def instrument_llamaindex(self): | |
|
||
instrument_llamaindex(self.to_sync()) | ||
|
||
def initialize(self): | ||
with redirect_stdout(io.StringIO()): | ||
Traceloop.init( | ||
disable_batch=True, | ||
exporter=LoggingSpanExporter(event_processor=self.event_processor), | ||
) | ||
|
||
def langchain_callback( | ||
self, | ||
to_ignore: Optional[List[str]] = None, | ||
|
@@ -352,6 +368,27 @@ def get_current_root_run(self): | |
""" | ||
return active_root_run_var.get() | ||
|
||
def set_properties( | ||
self, | ||
name: Optional[str] = None, | ||
tags: Optional[List[str]] = None, | ||
metadata: Optional[Dict[str, Any]] = None, | ||
): | ||
thread = active_thread_var.get() | ||
root_run = active_root_run_var.get() | ||
parent = active_steps_var.get()[-1] if active_steps_var.get() else None | ||
|
||
Traceloop.set_association_properties( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not an upsert so we need to maintain explicit calls. |
||
{ | ||
"literal.thread_id": str(thread.id) if thread else "None", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can only store strings |
||
"literal.parent_id": str(parent.id) if parent else "None", | ||
"literal.root_run_id": str(root_run.id) if root_run else "None", | ||
"literal.name": str(name) if name else "None", | ||
"literal.tags": json.dumps(tags) if tags else "None", | ||
"literal.metadata": json.dumps(metadata) if metadata else "None", | ||
} | ||
) | ||
|
||
def reset_context(self): | ||
""" | ||
Resets the context, forgetting active steps & setting current thread to None. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
from datetime import datetime, timezone | ||
import json | ||
from opentelemetry.sdk.trace import ReadableSpan | ||
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult | ||
from typing import Dict, List, Optional, Sequence, cast | ||
import logging | ||
|
||
|
||
from literalai.event_processor import EventProcessor | ||
from literalai.helper import utc_now | ||
from literalai.observability.generation import GenerationType | ||
from literalai.observability.step import Step, StepDict | ||
|
||
|
||
class LoggingSpanExporter(SpanExporter): | ||
def __init__( | ||
self, | ||
logger_name: str = "span_exporter", | ||
event_processor: Optional[EventProcessor] = None, | ||
): | ||
self.logger = logging.getLogger(logger_name) | ||
self.logger.setLevel(logging.INFO) | ||
self.event_processor = event_processor | ||
|
||
if not self.logger.handlers: | ||
handler = logging.StreamHandler() | ||
formatter = logging.Formatter( | ||
"%(asctime)s - %(name)s - %(levelname)s - %(message)s" | ||
) | ||
handler.setFormatter(formatter) | ||
self.logger.addHandler(handler) | ||
|
||
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: | ||
"""Export the spans by logging them.""" | ||
try: | ||
for span in spans: | ||
if ( | ||
span.attributes | ||
and span.attributes.get("gen_ai.request.model", None) is not None | ||
and self.event_processor is not None | ||
): | ||
step = self._create_step_from_span(span) | ||
self.event_processor.add_event(cast(StepDict, step.to_dict())) | ||
|
||
return SpanExportResult.SUCCESS | ||
except Exception as e: | ||
self.logger.error(f"Failed to export spans: {e}") | ||
return SpanExportResult.FAILURE | ||
|
||
def shutdown(self): | ||
"""Shuts down the exporter.""" | ||
if self.event_processor is not None: | ||
return self.event_processor.flush_and_stop() | ||
|
||
def force_flush(self, timeout_millis: float = 30000) -> bool: | ||
"""Force flush the exporter.""" | ||
return True | ||
|
||
# # TODO: Add generation promptid | ||
# # TODO: Add generation variables | ||
# # TODO: Check missing variables | ||
# # TODO: ttFirstToken | ||
# # TODO: duration | ||
# # TODO: tokenThroughputInSeconds | ||
# # TODO: Add tools | ||
# # TODO: error check with gemini error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Haven't found a proper way to handle those as of now:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could not find a way to handle gemini and ttFirstToken |
||
def _create_step_from_span(self, span: ReadableSpan) -> Step: | ||
"""Convert a span to a Step object""" | ||
attributes = span.attributes or {} | ||
|
||
start_time = ( | ||
datetime.fromtimestamp(span.start_time / 1e9, tz=timezone.utc).isoformat() | ||
if span.start_time | ||
else utc_now() | ||
) | ||
end_time = ( | ||
datetime.fromtimestamp(span.end_time / 1e9, tz=timezone.utc).isoformat() | ||
if span.end_time | ||
else utc_now() | ||
) | ||
|
||
generation_type = attributes.get("llm.request.type") | ||
is_chat = generation_type == "chat" | ||
|
||
span_props = { | ||
"parent_id": attributes.get( | ||
"traceloop.association.properties.literal.parent_id" | ||
), | ||
"thread_id": attributes.get( | ||
"traceloop.association.properties.literal.thread_id" | ||
), | ||
"root_run_id": attributes.get( | ||
"traceloop.association.properties.literal.root_run_id" | ||
), | ||
"metadata": attributes.get( | ||
"traceloop.association.properties.literal.metadata" | ||
), | ||
"tags": attributes.get("traceloop.association.properties.literal.tags"), | ||
"name": attributes.get("traceloop.association.properties.literal.name"), | ||
} | ||
|
||
span_props = { | ||
k: str(v) for k, v in span_props.items() if v is not None and v != "None" | ||
} | ||
|
||
generation_content = { | ||
"messages": ( | ||
self.extract_messages(cast(Dict, attributes)) if is_chat else None | ||
), | ||
"message_completion": ( | ||
self.extract_messages(cast(Dict, attributes), "gen_ai.completion.")[0] | ||
if is_chat | ||
else None | ||
), | ||
"prompt": attributes.get("gen_ai.prompt.0.user"), | ||
"completion": attributes.get("gen_ai.completion.0.content"), | ||
"model": attributes.get("gen_ai.request.model"), | ||
"provider": attributes.get("gen_ai.system"), | ||
} | ||
generation_settings = { | ||
"max_tokens": attributes.get("gen_ai.request.max_tokens"), | ||
"stream": attributes.get("llm.is_streaming"), | ||
"token_count": attributes.get("llm.usage.total_tokens"), | ||
"input_token_count": attributes.get("gen_ai.usage.prompt_tokens"), | ||
"output_token_count": attributes.get("gen_ai.usage.completion_tokens"), | ||
"frequency_penalty": attributes.get("gen_ai.request.frequency_penalty"), | ||
"presence_penalty": attributes.get("gen_ai.request.presence_penalty"), | ||
"temperature": attributes.get("gen_ai.request.temperature"), | ||
"top_p": attributes.get("gen_ai.request.top_p"), | ||
} | ||
|
||
step_dict = { | ||
"id": str(span.context.span_id) if span.context else None, | ||
"name": span_props.get("name", span.name), | ||
"type": "llm", | ||
"metadata": self.extract_json(span_props.get("metadata", "{}")), | ||
"startTime": start_time, | ||
"endTime": end_time, | ||
"threadId": span_props.get("thread_id"), | ||
"parentId": span_props.get("parent_id"), | ||
"rootRunId": span_props.get("root_run_id"), | ||
"tags": self.extract_json(span_props.get("tags", "[]")), | ||
"input": { | ||
"content": ( | ||
generation_content["messages"] | ||
if is_chat | ||
else generation_content["prompt"] | ||
) | ||
}, | ||
"output": { | ||
"content": ( | ||
generation_content["message_completion"] | ||
if is_chat | ||
else generation_content["completion"] | ||
) | ||
}, | ||
"generation": { | ||
"type": GenerationType.CHAT if is_chat else GenerationType.COMPLETION, | ||
"prompt": generation_content["prompt"] if not is_chat else None, | ||
"completion": generation_content["completion"] if not is_chat else None, | ||
"model": generation_content["model"], | ||
"provider": generation_content["provider"], | ||
"settings": generation_settings, | ||
"tokenCount": generation_settings["token_count"], | ||
"inputTokenCount": generation_settings["input_token_count"], | ||
"outputTokenCount": generation_settings["output_token_count"], | ||
"messages": generation_content["messages"], | ||
"messageCompletion": generation_content["message_completion"], | ||
}, | ||
} | ||
|
||
step = Step.from_dict(cast(StepDict, step_dict)) | ||
|
||
if not span.status.is_ok: | ||
step.error = span.status.description or "Unknown error" | ||
|
||
return step | ||
|
||
def extract_messages( | ||
self, data: Dict, prefix: str = "gen_ai.prompt." | ||
) -> List[Dict]: | ||
messages = [] | ||
index = 0 | ||
|
||
while True: | ||
role_key = f"{prefix}{index}.role" | ||
content_key = f"{prefix}{index}.content" | ||
|
||
if role_key not in data or content_key not in data: | ||
break | ||
|
||
messages.append( | ||
{ | ||
"role": data[role_key], | ||
"content": self.extract_json(data[content_key]), | ||
} | ||
) | ||
|
||
index += 1 | ||
|
||
return messages | ||
|
||
def extract_json(self, data: str) -> Dict | List | str: | ||
try: | ||
content = json.loads(data) | ||
except Exception: | ||
content = data | ||
|
||
return content |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
|
||
from pydantic import Field | ||
from pydantic.dataclasses import dataclass | ||
from traceloop.sdk import Traceloop | ||
from typing_extensions import TypedDict | ||
|
||
if TYPE_CHECKING: | ||
|
@@ -69,7 +70,7 @@ class AttachmentDict(TypedDict, total=False): | |
@dataclass(repr=False) | ||
class Score(Utils): | ||
""" | ||
A score captures information about the quality of a step/experiment item. | ||
A score captures information about the quality of a step/experiment item. | ||
It can be of type either: | ||
- HUMAN: to capture human feedback | ||
- CODE: to capture the result of a code execution (deterministic) | ||
|
@@ -127,9 +128,10 @@ def from_dict(cls, score_dict: ScoreDict) -> "Score": | |
@dataclass(repr=False) | ||
class Attachment(Utils): | ||
""" | ||
An attachment is an object that can be associated with a step. | ||
An attachment is an object that can be associated with a step. | ||
It can be an image, a file, a video, etc. | ||
""" | ||
|
||
step_id: Optional[str] = None | ||
thread_id: Optional[str] = None | ||
id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4())) | ||
|
@@ -522,6 +524,21 @@ async def __aenter__(self): | |
|
||
if active_root_run_var.get() is None and self.step_type == "run": | ||
active_root_run_var.set(self.step) | ||
Traceloop.set_association_properties( | ||
{ | ||
"literal.thread_id": str(self.step.thread_id), | ||
"literal.parent_id": self.step.id, | ||
"literal.root_run_id": str(self.step.id), | ||
} | ||
) | ||
else: | ||
Traceloop.set_association_properties( | ||
{ | ||
"literal.thread_id": str(self.thread_id), | ||
"literal.parent_id": self.step.id, | ||
"literal.root_run_id": str(self.step.root_run_id), | ||
} | ||
) | ||
|
||
return self.step | ||
|
||
|
@@ -549,6 +566,21 @@ def __enter__(self) -> Step: | |
|
||
if active_root_run_var.get() is None and self.step_type == "run": | ||
active_root_run_var.set(self.step) | ||
Traceloop.set_association_properties( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if otel is not initialized this wont be an issue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it can be safely called even without initialization 👍 |
||
{ | ||
"literal.thread_id": str(self.step.thread_id), | ||
"literal.parent_id": self.step.id, | ||
"literal.root_run_id": str(self.step.id), | ||
} | ||
) | ||
else: | ||
Traceloop.set_association_properties( | ||
{ | ||
"literal.thread_id": str(self.thread_id), | ||
"literal.parent_id": self.step.id, | ||
"literal.root_run_id": str(self.step.root_run_id), | ||
} | ||
) | ||
|
||
return self.step | ||
|
||
|
@@ -637,6 +669,7 @@ def sync_wrapper(*args, **kwargs): | |
step.output = {"content": deepcopy(result)} | ||
except Exception: | ||
pass | ||
|
||
return result | ||
|
||
return sync_wrapper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Traceloop is verbose so this is aimed to prevent explicit
print
statement.