Skip to content

update llm step migration #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion literalai/callback/langchain_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def _start_trace(self, run: Run) -> None:
elif run.run_type == "embedding":
step_type = "embedding"

if not self.steps:
if not self.steps and step_type != "llm":
step_type = "run"

step = self.client.start_step(
Expand Down
17 changes: 17 additions & 0 deletions literalai/callback/llama_index_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(
event_ends_to_ignore=event_ends_to_ignore,
)
self.client = client
self.is_pristine = True

self.steps = {}

Expand All @@ -78,7 +79,9 @@ def on_event_start(
**kwargs: Any,
) -> str:
"""Run when an event starts and return id of event."""

step_type: TrueStepType = "undefined"

if event_type == CBEventType.RETRIEVE:
step_type = "retrieval"
elif event_type == CBEventType.QUERY:
Expand All @@ -88,12 +91,26 @@ def on_event_start(
else:
return event_id

step_type = (
"run" if self.is_pristine and step_type != "llm" else "undefined"
)

self.is_pristine = False

step = self.client.start_step(
name=event_type.value,
type=step_type,
parent_id=self._get_parent_id(parent_id),
id=event_id,
)
if event_type == CBEventType.EXCEPTION:
step.error = (
payload.get(EventPayload.EXCEPTION, {}).get("message", "")
if payload
else None
)
step.end()
return event_id
self.steps[event_id] = step
step.start_time = utc_now()
step.input = payload or {}
Expand Down
2 changes: 1 addition & 1 deletion literalai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def message(
content: str = "",
id: Optional[str] = None,
parent_id: Optional[str] = None,
type: Optional[MessageStepType] = None,
type: Optional[MessageStepType] = "assistant_message",
name: Optional[str] = None,
thread_id: Optional[str] = None,
attachments: List[Attachment] = [],
Expand Down
26 changes: 13 additions & 13 deletions literalai/event_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import asyncio
import queue
import logging
import traceback
import queue
import threading
import time
import traceback
from typing import TYPE_CHECKING, List

logger = logging.getLogger(__name__)
Expand All @@ -14,6 +14,7 @@

DEFAULT_SLEEP_TIME = 0.2


# to_thread is a backport of asyncio.to_thread from Python 3.9
async def to_thread(func, /, *args, **kwargs):
import contextvars
Expand All @@ -29,7 +30,6 @@ class EventProcessor:
event_queue: queue.Queue
batch: List["StepDict"]


def __init__(self, api: "LiteralAPI", batch_size: int = 5, disabled: bool = False):
self.batch_size = batch_size
self.api = api
Expand All @@ -51,36 +51,36 @@ async def a_add_events(self, event: "StepDict"):

def _process_events(self):
while True:
self.batch = []
batch = []
try:
# Try to fill the batch up to the batch_size
while len(self.batch) < self.batch_size:
while len(batch) < self.batch_size:
# Attempt to get events with a timeout
event = self.event_queue.get(timeout=0.5)
self.batch.append(event)
batch.append(event)
except queue.Empty:
# No more events at the moment, proceed with processing what's in the batch
pass

# Process the batch if any events are present - in a separate thread
if self.batch:
self._process_batch()
if batch:
self._process_batch(batch)

# Stop if the stop_event is set and the queue is empty
if self.stop_event.is_set() and self.event_queue.empty():
break

def _try_process_batch(self):
def _try_process_batch(self, batch: List):
try:
return self.api.send_steps(self.batch)
return self.api.send_steps(batch)
except Exception:
logger.error(f"Failed to send steps: {traceback.format_exc()}")
return None

def _process_batch(self):
def _process_batch(self, batch: List):
# Simple one-try retry in case of network failure (no retry on graphql errors)
retries = 0
while not self._try_process_batch() and retries < 1:
while not self._try_process_batch(batch) and retries < 1:
retries += 1
time.sleep(DEFAULT_SLEEP_TIME)

Expand All @@ -94,7 +94,7 @@ async def aflush(self):
await asyncio.sleep(DEFAULT_SLEEP_TIME)

def flush(self):
while not self.event_queue.empty() or len(self.batch) > 0:
while not self.event_queue.empty():
time.sleep(0.2)

def __del__(self):
Expand Down
8 changes: 5 additions & 3 deletions literalai/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(
self,
content: str,
id: Optional[str] = None,
type: Optional[MessageStepType] = None,
type: Optional[MessageStepType] = "assistant_message",
name: Optional[str] = None,
thread_id: Optional[str] = None,
parent_id: Optional[str] = None,
Expand Down Expand Up @@ -78,8 +78,10 @@ def end(self):
if active_thread := active_thread_var.get():
self.thread_id = active_thread.id

if not self.thread_id:
raise Exception("Message must be initialized with a thread_id.")
if not self.thread_id and not self.parent_id:
raise Exception(
"Message must be initialized with a thread_id or a parent id."
)

if self.processor is None:
raise Exception(
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name="literalai",
version="0.0.606", # update version in literalai/version.py
version="0.0.607", # update version in literalai/version.py
description="An SDK for observability in Python applications",
author="Literal AI",
package_data={"literalai": ["py.typed"]},
Expand Down
Loading