diff --git a/examples/langchain_toolcall.py b/examples/langchain_toolcall.py index 6fc3939..f9fc60f 100644 --- a/examples/langchain_toolcall.py +++ b/examples/langchain_toolcall.py @@ -19,6 +19,7 @@ tools = [search] lai_client = LiteralClient() +lai_client.initialize() lai_prompt = lai_client.api.get_or_create_prompt( name="LC Agent", settings={ @@ -37,13 +38,13 @@ {"role": "assistant", "content": "{{agent_scratchpad}}"}, ], ) -prompt = lai_prompt.to_langchain_chat_prompt_template() +prompt = lai_prompt.to_langchain_chat_prompt_template( + additional_messages=[("placeholder", "{agent_scratchpad}")], +) agent: BaseSingleActionAgent = create_tool_calling_agent(model, tools, prompt) # type: ignore agent_executor = AgentExecutor(agent=agent, tools=tools) -cb = lai_client.langchain_callback() - # Replace with ainvoke for asynchronous execution. agent_executor.invoke( { @@ -56,5 +57,5 @@ ], "input": "whats the weather in sf?", }, - config=RunnableConfig(callbacks=[cb], run_name="Weather SF"), + config=RunnableConfig(run_name="Weather SF"), ) diff --git a/examples/langchain_variable.py b/examples/langchain_variable.py index c58af52..d0da94c 100644 --- a/examples/langchain_variable.py +++ b/examples/langchain_variable.py @@ -1,12 +1,13 @@ from langchain.chat_models import init_chat_model from literalai import LiteralClient -from langchain.schema.runnable.config import RunnableConfig + from dotenv import load_dotenv load_dotenv() lai = LiteralClient() +lai.initialize() prompt = lai.api.get_or_create_prompt( name="user intent", @@ -29,13 +30,14 @@ input_messages = messages.format_messages( user_message="The screen is cracked, there are scratches on the surface, and a component is missing." ) -cb = lai.langchain_callback() # Returns a langchain_openai.ChatOpenAI instance. gpt_4o = init_chat_model( # type: ignore model_provider=prompt.provider, **prompt.settings, ) -print(gpt_4o.invoke(input_messages, config=RunnableConfig(callbacks=[cb]))) + +lai.set_properties(prompt=prompt) +print(gpt_4o.invoke(input_messages)) lai.flush_and_stop() diff --git a/examples/llamaindex_workflow.py b/examples/llamaindex_workflow.py new file mode 100644 index 0000000..c580e40 --- /dev/null +++ b/examples/llamaindex_workflow.py @@ -0,0 +1,55 @@ +import asyncio +from llama_index.core.workflow import ( + Event, + StartEvent, + StopEvent, + Workflow, + step, +) +from llama_index.llms.openai import OpenAI +from literalai.client import LiteralClient + +lai_client = LiteralClient() +lai_client.initialize() + + +class JokeEvent(Event): + joke: str + +class RewriteJoke(Event): + joke: str + + +class JokeFlow(Workflow): + llm = OpenAI() + + @step() + async def generate_joke(self, ev: StartEvent) -> JokeEvent: + topic = ev.topic + + prompt = f"Write your best joke about {topic}." + response = await self.llm.acomplete(prompt) + return JokeEvent(joke=str(response)) + + @step() + async def return_joke(self, ev: JokeEvent) -> RewriteJoke: + return RewriteJoke(joke=ev.joke + "What is funny?") + + @step() + async def critique_joke(self, ev: RewriteJoke) -> StopEvent: + joke = ev.joke + + prompt = f"Give a thorough analysis and critique of the following joke: {joke}" + response = await self.llm.acomplete(prompt) + return StopEvent(result=str(response)) + + +@lai_client.thread(name="JokeFlow") +async def main(): + w = JokeFlow(timeout=60, verbose=False) + result = await w.run(topic="pirates") + print(str(result)) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/multimodal.py b/examples/multimodal.py new file mode 100644 index 0000000..f8e4a54 --- /dev/null +++ b/examples/multimodal.py @@ -0,0 +1,83 @@ +import base64 +import requests # type: ignore +import time + +from literalai import LiteralClient +from openai import OpenAI + +from dotenv import load_dotenv + + +load_dotenv() + +openai_client = OpenAI() + +literalai_client = LiteralClient() +literalai_client.initialize() + + +def encode_image(url): + return base64.b64encode(requests.get(url).content) + + +@literalai_client.step(type="run") +def generate_answer(user_query, image_url): + literalai_client.set_properties( + name="foobar", + metadata={"foo": "bar"}, + tags=["foo", "bar"], + ) + completion = openai_client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": user_query}, + { + "type": "image_url", + "image_url": {"url": image_url}, + }, + ], + }, + ], + max_tokens=300, + ) + return completion.choices[0].message.content + + +def main(): + with literalai_client.thread(name="Meal Analyzer") as thread: + welcome_message = ( + "Welcome to the meal analyzer, please upload an image of your plate!" + ) + literalai_client.message( + content=welcome_message, type="assistant_message", name="My Assistant" + ) + + user_query = "Is this a healthy meal?" + user_image = "https://www.eatthis.com/wp-content/uploads/sites/4/2021/05/healthy-plate.jpg" + user_step = literalai_client.message( + content=user_query, type="user_message", name="User" + ) + + time.sleep(1) # to make sure the user step has arrived at Literal AI + + literalai_client.api.create_attachment( + thread_id=thread.id, + step_id=user_step.id, + name="meal_image", + content=encode_image(user_image), + ) + + answer = generate_answer(user_query=user_query, image_url=user_image) + literalai_client.message( + content=answer, type="assistant_message", name="My Assistant" + ) + + +main() +# Network requests by the SDK are performed asynchronously. +# Invoke flush_and_stop() to guarantee the completion of all requests prior to the process termination. +# WARNING: If you run a continuous server, you should not use this method. +literalai_client.flush_and_stop() diff --git a/examples/streaming.py b/examples/streaming.py index ed47a3e..1c884a1 100644 --- a/examples/streaming.py +++ b/examples/streaming.py @@ -12,7 +12,7 @@ sdk = LiteralClient(batch_size=2) -sdk.instrument_openai() +sdk.initialize() @sdk.thread diff --git a/literalai/client.py b/literalai/client.py index 2156637..bda138c 100644 --- a/literalai/client.py +++ b/literalai/client.py @@ -1,5 +1,10 @@ +import json import os +from traceloop.sdk import Traceloop from typing import Any, Dict, List, Optional, Union +from typing_extensions import deprecated +import io +from contextlib import redirect_stdout from literalai.api import AsyncLiteralAPI, LiteralAPI from literalai.callback.langchain_callback import get_langchain_callback @@ -10,6 +15,7 @@ experiment_item_run_decorator, ) from literalai.event_processor import EventProcessor +from literalai.exporter import LoggingSpanExporter from literalai.instrumentation.mistralai import instrument_mistralai from literalai.instrumentation.openai import instrument_openai from literalai.my_types import Environment @@ -23,6 +29,7 @@ step_decorator, ) from literalai.observability.thread import ThreadContextManager, thread_decorator +from literalai.prompt_engineering.prompt import Prompt from literalai.requirements import check_all_requirements @@ -92,18 +99,21 @@ def to_sync(self) -> "LiteralClient": else: return self # type: ignore + @deprecated("Use Literal.initialize instead") def instrument_openai(self): """ Instruments the OpenAI SDK so that all LLM calls are logged to Literal AI. """ instrument_openai(self.to_sync()) + @deprecated("Use Literal.initialize instead") def instrument_mistralai(self): """ Instruments the Mistral AI SDK so that all LLM calls are logged to Literal AI. """ instrument_mistralai(self.to_sync()) + @deprecated("Use Literal.initialize instead") def instrument_llamaindex(self): """ Instruments the Llama Index framework so that all RAG & LLM calls are logged to Literal AI. @@ -119,6 +129,13 @@ def instrument_llamaindex(self): instrument_llamaindex(self.to_sync()) + def initialize(self): + with redirect_stdout(io.StringIO()): + Traceloop.init( + disable_batch=True, + exporter=LoggingSpanExporter(event_processor=self.event_processor), + ) + def langchain_callback( self, to_ignore: Optional[List[str]] = None, @@ -352,6 +369,29 @@ def get_current_root_run(self): """ return active_root_run_var.get() + def set_properties( + self, + name: Optional[str] = None, + tags: Optional[List[str]] = None, + metadata: Optional[Dict[str, Any]] = None, + prompt: Optional[Prompt] = None, + ): + thread = active_thread_var.get() + root_run = active_root_run_var.get() + parent = active_steps_var.get()[-1] if active_steps_var.get() else None + + Traceloop.set_association_properties( + { + "literal.thread_id": str(thread.id) if thread else "None", + "literal.parent_id": str(parent.id) if parent else "None", + "literal.root_run_id": str(root_run.id) if root_run else "None", + "literal.name": str(name) if name else "None", + "literal.tags": json.dumps(tags) if tags else "None", + "literal.metadata": json.dumps(metadata) if metadata else "None", + "literal.prompt": json.dumps(prompt.to_dict()) if prompt else "None", + } + ) + def reset_context(self): """ Resets the context, forgetting active steps & setting current thread to None. diff --git a/literalai/exporter.py b/literalai/exporter.py new file mode 100644 index 0000000..28877e3 --- /dev/null +++ b/literalai/exporter.py @@ -0,0 +1,245 @@ +from datetime import datetime, timezone +import json +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from typing import Dict, List, Optional, Sequence, Union, cast +import logging + + +from literalai.event_processor import EventProcessor +from literalai.helper import utc_now +from literalai.observability.generation import GenerationType +from literalai.observability.step import Step, StepDict +from literalai.prompt_engineering.prompt import PromptDict + + +# TODO: Suppport Gemini models https://github.com/traceloop/openllmetry/issues/2419 +# TODO: Support llamaindex workflow https://github.com/traceloop/openllmetry/pull/2421 +class LoggingSpanExporter(SpanExporter): + def __init__( + self, + logger_name: str = "span_exporter", + event_processor: Optional[EventProcessor] = None, + ): + self.logger = logging.getLogger(logger_name) + self.logger.setLevel(logging.INFO) + self.event_processor = event_processor + + if not self.logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + handler.setFormatter(formatter) + self.logger.addHandler(handler) + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + """Export the spans by logging them.""" + try: + for span in spans: + if ( + span.attributes + and span.attributes.get("gen_ai.request.model", None) is not None + and self.event_processor is not None + ): + step = self._create_step_from_span(span) + self.event_processor.add_event(cast(StepDict, step.to_dict())) + + return SpanExportResult.SUCCESS + except Exception as e: + self.logger.error(f"Failed to export spans: {e}") + return SpanExportResult.FAILURE + + def shutdown(self): + """Shuts down the exporter.""" + if self.event_processor is not None: + return self.event_processor.flush_and_stop() + + def force_flush(self, timeout_millis: float = 30000) -> bool: + """Force flush the exporter.""" + return True + + def _create_step_from_span(self, span: ReadableSpan) -> Step: + """Convert a span to a Step object""" + attributes = span.attributes or {} + + start_time = ( + datetime.fromtimestamp(span.start_time / 1e9, tz=timezone.utc).isoformat() + if span.start_time + else utc_now() + ) + end_time = ( + datetime.fromtimestamp(span.end_time / 1e9, tz=timezone.utc).isoformat() + if span.end_time + else utc_now() + ) + duration, token_throughput = self._calculate_duration_and_throughput( + span.start_time, + span.end_time, + int(str(attributes.get("llm.usage.total_tokens", 0))), + ) + + generation_type = attributes.get("llm.request.type") + is_chat = generation_type == "chat" + + span_props = { + "parent_id": attributes.get( + "traceloop.association.properties.literal.parent_id" + ), + "thread_id": attributes.get( + "traceloop.association.properties.literal.thread_id" + ), + "root_run_id": attributes.get( + "traceloop.association.properties.literal.root_run_id" + ), + "metadata": attributes.get( + "traceloop.association.properties.literal.metadata" + ), + "tags": attributes.get("traceloop.association.properties.literal.tags"), + "name": attributes.get("traceloop.association.properties.literal.name"), + } + + span_props = { + k: str(v) for k, v in span_props.items() if v is not None and v != "None" + } + + serialized_prompt = attributes.get( + "traceloop.association.properties.literal.prompt" + ) + prompt = cast( + Optional[PromptDict], + ( + self._extract_json(str(serialized_prompt)) + if serialized_prompt and serialized_prompt != "None" + else None + ), + ) + + generation_content = { + "duration": duration, + "messages": ( + self._extract_messages(cast(Dict, attributes)) if is_chat else None + ), + "message_completion": ( + self._extract_messages(cast(Dict, attributes), "gen_ai.completion.")[0] + if is_chat + else None + ), + "prompt": attributes.get("gen_ai.prompt.0.user"), + "promptId": prompt.get("id") if prompt else None, + "completion": attributes.get("gen_ai.completion.0.content"), + "model": attributes.get("gen_ai.request.model"), + "provider": attributes.get("gen_ai.system"), + "tokenThroughputInSeconds": token_throughput, + "variables": prompt.get("variables") if prompt else None, + } + generation_settings = { + "max_tokens": attributes.get("gen_ai.request.max_tokens"), + "stream": attributes.get("llm.is_streaming"), + "token_count": attributes.get("llm.usage.total_tokens"), + "input_token_count": attributes.get("gen_ai.usage.prompt_tokens"), + "output_token_count": attributes.get("gen_ai.usage.completion_tokens"), + "frequency_penalty": attributes.get("gen_ai.request.frequency_penalty"), + "presence_penalty": attributes.get("gen_ai.request.presence_penalty"), + "temperature": attributes.get("gen_ai.request.temperature"), + "top_p": attributes.get("gen_ai.request.top_p"), + } + + step_dict = { + "id": str(span.context.span_id) if span.context else None, + "name": span_props.get("name", span.name), + "type": "llm", + "metadata": self._extract_json(str(span_props.get("metadata", "{}"))), + "startTime": start_time, + "endTime": end_time, + "threadId": span_props.get("thread_id"), + "parentId": span_props.get("parent_id"), + "rootRunId": span_props.get("root_run_id"), + "tags": self._extract_json(str(span_props.get("tags", "[]"))), + "input": { + "content": ( + generation_content["messages"] + if is_chat + else generation_content["prompt"] + ) + }, + "output": { + "content": ( + generation_content["message_completion"] + if is_chat + else generation_content["completion"] + ) + }, + "generation": { + "type": GenerationType.CHAT if is_chat else GenerationType.COMPLETION, + "prompt": generation_content["prompt"] if not is_chat else None, + "completion": generation_content["completion"] if not is_chat else None, + "model": generation_content["model"], + "provider": generation_content["provider"], + "settings": generation_settings, + "tokenCount": generation_settings["token_count"], + "inputTokenCount": generation_settings["input_token_count"], + "outputTokenCount": generation_settings["output_token_count"], + "messages": generation_content["messages"], + "messageCompletion": generation_content["message_completion"], + }, + } + + step = Step.from_dict(cast(StepDict, step_dict)) + + if not span.status.is_ok: + step.error = span.status.description or "Unknown error" + + return step + + def _extract_messages( + self, data: Dict, prefix: str = "gen_ai.prompt." + ) -> List[Dict]: + messages = [] + index = 0 + + while True: + role_key = f"{prefix}{index}.role" + content_key = f"{prefix}{index}.content" + + if role_key not in data or content_key not in data: + break + if data[role_key] == "placeholder": + break + + messages.append( + { + "role": data[role_key], + "content": self._extract_json(data[content_key]), + } + ) + + index += 1 + + return messages + + def _extract_json(self, data: str) -> Union[Dict, List, str]: + try: + content = json.loads(data) + except Exception: + content = data + + return content + + def _calculate_duration_and_throughput( + self, + start_time_ns: Optional[int], + end_time_ns: Optional[int], + total_tokens: Optional[int], + ) -> tuple[float, Optional[float]]: + """Calculate duration in seconds and token throughput per second.""" + duration_ns = ( + end_time_ns - start_time_ns if start_time_ns and end_time_ns else 0 + ) + duration_seconds = duration_ns / 1e9 + + token_throughput = None + if total_tokens is not None and duration_seconds > 0: + token_throughput = total_tokens / duration_seconds + + return duration_seconds, token_throughput diff --git a/literalai/instrumentation/openai.py b/literalai/instrumentation/openai.py index 8922ced..b1554a4 100644 --- a/literalai/instrumentation/openai.py +++ b/literalai/instrumentation/openai.py @@ -10,7 +10,12 @@ from literalai.context import active_steps_var, active_thread_var from literalai.helper import ensure_values_serializable -from literalai.observability.generation import GenerationMessage, CompletionGeneration, ChatGeneration, GenerationType +from literalai.observability.generation import ( + GenerationMessage, + CompletionGeneration, + ChatGeneration, + GenerationType, +) from literalai.wrappers import AfterContext, BeforeContext, wrap_all logger = logging.getLogger(__name__) diff --git a/literalai/observability/step.py b/literalai/observability/step.py index 8a3ee89..27d3c51 100644 --- a/literalai/observability/step.py +++ b/literalai/observability/step.py @@ -16,6 +16,7 @@ from pydantic import Field from pydantic.dataclasses import dataclass +from traceloop.sdk import Traceloop from typing_extensions import TypedDict if TYPE_CHECKING: @@ -69,7 +70,7 @@ class AttachmentDict(TypedDict, total=False): @dataclass(repr=False) class Score(Utils): """ - A score captures information about the quality of a step/experiment item. + A score captures information about the quality of a step/experiment item. It can be of type either: - HUMAN: to capture human feedback - CODE: to capture the result of a code execution (deterministic) @@ -127,9 +128,10 @@ def from_dict(cls, score_dict: ScoreDict) -> "Score": @dataclass(repr=False) class Attachment(Utils): """ - An attachment is an object that can be associated with a step. + An attachment is an object that can be associated with a step. It can be an image, a file, a video, etc. """ + step_id: Optional[str] = None thread_id: Optional[str] = None id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4())) @@ -522,6 +524,21 @@ async def __aenter__(self): if active_root_run_var.get() is None and self.step_type == "run": active_root_run_var.set(self.step) + Traceloop.set_association_properties( + { + "literal.thread_id": str(self.step.thread_id), + "literal.parent_id": self.step.id, + "literal.root_run_id": str(self.step.id), + } + ) + else: + Traceloop.set_association_properties( + { + "literal.thread_id": str(self.thread_id), + "literal.parent_id": self.step.id, + "literal.root_run_id": str(self.step.root_run_id), + } + ) return self.step @@ -549,6 +566,21 @@ def __enter__(self) -> Step: if active_root_run_var.get() is None and self.step_type == "run": active_root_run_var.set(self.step) + Traceloop.set_association_properties( + { + "literal.thread_id": str(self.step.thread_id), + "literal.parent_id": self.step.id, + "literal.root_run_id": str(self.step.id), + } + ) + else: + Traceloop.set_association_properties( + { + "literal.thread_id": str(self.thread_id), + "literal.parent_id": self.step.id, + "literal.root_run_id": str(self.step.root_run_id), + } + ) return self.step @@ -637,6 +669,7 @@ def sync_wrapper(*args, **kwargs): step.output = {"content": deepcopy(result)} except Exception: pass + return result return sync_wrapper diff --git a/literalai/observability/thread.py b/literalai/observability/thread.py index 87a22d4..a681f52 100644 --- a/literalai/observability/thread.py +++ b/literalai/observability/thread.py @@ -5,6 +5,8 @@ from functools import wraps from typing import TYPE_CHECKING, Callable, Dict, List, Optional, TypedDict +from traceloop.sdk import Traceloop + from literalai.context import active_thread_var from literalai.my_types import UserDict, Utils from literalai.observability.step import Step, StepDict @@ -188,6 +190,11 @@ def __call__(self, func): def __enter__(self) -> "Optional[Thread]": thread_id = self.thread_id if self.thread_id else str(uuid.uuid4()) active_thread_var.set(Thread(id=thread_id, name=self.name, **self.kwargs)) + Traceloop.set_association_properties( + { + "literal.thread_id": thread_id, + } + ) return active_thread_var.get() def __exit__(self, exc_type, exc_val, exc_tb): @@ -198,6 +205,11 @@ def __exit__(self, exc_type, exc_val, exc_tb): async def __aenter__(self): thread_id = self.thread_id if self.thread_id else str(uuid.uuid4()) active_thread_var.set(Thread(id=thread_id, name=self.name, **self.kwargs)) + Traceloop.set_association_properties( + { + "literal.thread_id": thread_id, + } + ) return active_thread_var.get() async def __aexit__(self, exc_type, exc_val, exc_tb): diff --git a/mypy.ini b/mypy.ini index d3aaeb3..7e4bc03 100644 --- a/mypy.ini +++ b/mypy.ini @@ -8,4 +8,11 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-langchain_community.*] -ignore_missing_imports = True \ No newline at end of file +ignore_missing_imports = True + +[mypy-traceloop.*] +ignore_missing_imports = True + +[mypy-llama_index.*] +ignore_missing_imports = True + diff --git a/requirements.txt b/requirements.txt index 836bff6..f1d4670 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ packaging==23.2 httpx>=0.23.0 pydantic>=1,<3 openai>=1.0.0 -chevron>=0.14.0 \ No newline at end of file +chevron>=0.14.0 +traceloop-sdk>=0.33.12