diff --git a/literalai/callback/langchain_callback.py b/literalai/callback/langchain_callback.py index e5d9638..43dba05 100644 --- a/literalai/callback/langchain_callback.py +++ b/literalai/callback/langchain_callback.py @@ -347,7 +347,7 @@ def _start_trace(self, run: Run) -> None: elif run.run_type == "embedding": step_type = "embedding" - if not self.steps: + if not self.steps and step_type != "llm": step_type = "run" step = self.client.start_step( diff --git a/literalai/callback/llama_index_callback.py b/literalai/callback/llama_index_callback.py index f864935..9b61633 100644 --- a/literalai/callback/llama_index_callback.py +++ b/literalai/callback/llama_index_callback.py @@ -55,6 +55,7 @@ def __init__( event_ends_to_ignore=event_ends_to_ignore, ) self.client = client + self.is_pristine = True self.steps = {} @@ -78,7 +79,9 @@ def on_event_start( **kwargs: Any, ) -> str: """Run when an event starts and return id of event.""" + step_type: TrueStepType = "undefined" + if event_type == CBEventType.RETRIEVE: step_type = "retrieval" elif event_type == CBEventType.QUERY: @@ -88,12 +91,26 @@ def on_event_start( else: return event_id + step_type = ( + "run" if self.is_pristine and step_type != "llm" else "undefined" + ) + + self.is_pristine = False + step = self.client.start_step( name=event_type.value, type=step_type, parent_id=self._get_parent_id(parent_id), id=event_id, ) + if event_type == CBEventType.EXCEPTION: + step.error = ( + payload.get(EventPayload.EXCEPTION, {}).get("message", "") + if payload + else None + ) + step.end() + return event_id self.steps[event_id] = step step.start_time = utc_now() step.input = payload or {} diff --git a/literalai/client.py b/literalai/client.py index ead6040..d96301f 100644 --- a/literalai/client.py +++ b/literalai/client.py @@ -158,7 +158,7 @@ def message( content: str = "", id: Optional[str] = None, parent_id: Optional[str] = None, - type: Optional[MessageStepType] = None, + type: Optional[MessageStepType] = "assistant_message", name: Optional[str] = None, thread_id: Optional[str] = None, attachments: List[Attachment] = [], diff --git a/literalai/event_processor.py b/literalai/event_processor.py index 413c53f..d7dfe55 100644 --- a/literalai/event_processor.py +++ b/literalai/event_processor.py @@ -1,9 +1,9 @@ import asyncio -import queue import logging -import traceback +import queue import threading import time +import traceback from typing import TYPE_CHECKING, List logger = logging.getLogger(__name__) @@ -14,6 +14,7 @@ DEFAULT_SLEEP_TIME = 0.2 + # to_thread is a backport of asyncio.to_thread from Python 3.9 async def to_thread(func, /, *args, **kwargs): import contextvars @@ -29,7 +30,6 @@ class EventProcessor: event_queue: queue.Queue batch: List["StepDict"] - def __init__(self, api: "LiteralAPI", batch_size: int = 5, disabled: bool = False): self.batch_size = batch_size self.api = api @@ -51,36 +51,36 @@ async def a_add_events(self, event: "StepDict"): def _process_events(self): while True: - self.batch = [] + batch = [] try: # Try to fill the batch up to the batch_size - while len(self.batch) < self.batch_size: + while len(batch) < self.batch_size: # Attempt to get events with a timeout event = self.event_queue.get(timeout=0.5) - self.batch.append(event) + batch.append(event) except queue.Empty: # No more events at the moment, proceed with processing what's in the batch pass # Process the batch if any events are present - in a separate thread - if self.batch: - self._process_batch() + if batch: + self._process_batch(batch) # Stop if the stop_event is set and the queue is empty if self.stop_event.is_set() and self.event_queue.empty(): break - def _try_process_batch(self): + def _try_process_batch(self, batch: List): try: - return self.api.send_steps(self.batch) + return self.api.send_steps(batch) except Exception: logger.error(f"Failed to send steps: {traceback.format_exc()}") return None - def _process_batch(self): + def _process_batch(self, batch: List): # Simple one-try retry in case of network failure (no retry on graphql errors) retries = 0 - while not self._try_process_batch() and retries < 1: + while not self._try_process_batch(batch) and retries < 1: retries += 1 time.sleep(DEFAULT_SLEEP_TIME) @@ -94,7 +94,7 @@ async def aflush(self): await asyncio.sleep(DEFAULT_SLEEP_TIME) def flush(self): - while not self.event_queue.empty() or len(self.batch) > 0: + while not self.event_queue.empty(): time.sleep(0.2) def __del__(self): diff --git a/literalai/message.py b/literalai/message.py index 8d58433..5e1eb2d 100644 --- a/literalai/message.py +++ b/literalai/message.py @@ -29,7 +29,7 @@ def __init__( self, content: str, id: Optional[str] = None, - type: Optional[MessageStepType] = None, + type: Optional[MessageStepType] = "assistant_message", name: Optional[str] = None, thread_id: Optional[str] = None, parent_id: Optional[str] = None, @@ -78,8 +78,10 @@ def end(self): if active_thread := active_thread_var.get(): self.thread_id = active_thread.id - if not self.thread_id: - raise Exception("Message must be initialized with a thread_id.") + if not self.thread_id and not self.parent_id: + raise Exception( + "Message must be initialized with a thread_id or a parent id." + ) if self.processor is None: raise Exception( diff --git a/setup.py b/setup.py index a1eb746..2a7f176 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="literalai", - version="0.0.606", # update version in literalai/version.py + version="0.0.607", # update version in literalai/version.py description="An SDK for observability in Python applications", author="Literal AI", package_data={"literalai": ["py.typed"]},