Skip to content

llm events endpoint #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v5.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: check-json
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.6
rev: v0.9.6
hooks:
- id: ruff-format
- id: ruff
Expand All @@ -17,7 +17,7 @@ repos:
- id: ruff
name: ruff-linter
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
rev: v1.15.0
hooks:
- id: mypy
additional_dependencies: ['types-requests']
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Change logging level to debug for flushing events and adding event to queue

### Added
- LLM events are now routed to the new publis_llm_events endpoint

## [1.8.2] - 2025-02-10
### Fixed
- Added missing flushing limit values, and setup automated flush if limit is reached
Expand Down
9 changes: 7 additions & 2 deletions examples/sdk_test.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import logging
import os

from trubrics import Trubrics

trubrics = Trubrics(api_key=os.environ["TRUBRICS_API_KEY"])
trubrics.track(event="Sign up", user_id="user_id")
trubrics.logger.setLevel(logging.DEBUG)


trubrics.track(event="Page view", user_id="user_id", properties={"page": "home"})
trubrics.track(event="Page view", user_id="user_id", properties={"page": "events"})

trubrics.track_llm(
user_id="user_id",
user_id="sdk_test",
prompt="What is Trubrics?",
assistant_id="gpt4o",
generation="Trubrics is a product analytics platform for AI applications.",
Expand Down
11 changes: 11 additions & 0 deletions trubrics/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from enum import Enum


class EventTypes(Enum):
event = "event"
llm_event = "llm_event"


class IngestionEndpoints(Enum):
events = "publish_events"
llm_events = "publish_llm_events"
106 changes: 71 additions & 35 deletions trubrics/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import threading
import time
from datetime import datetime, timedelta, timezone
from datetime import datetime, timezone

import requests

Expand All @@ -13,6 +13,7 @@
MAX_FLUSH_BATCH_SIZE,
MIN_FLUSH_INTERVAL,
)
from trubrics.enums import EventTypes, IngestionEndpoints
from trubrics.logger import trubrics_logger


Expand Down Expand Up @@ -101,6 +102,7 @@ def track(
if timestamp
else datetime.now(timezone.utc).isoformat()
),
"event_type": EventTypes.event,
}
with self._lock:
self.queue.append(event_dict)
Expand Down Expand Up @@ -129,28 +131,25 @@ def track_llm(
timestamp (datetime | None): The timestamp of the generation event. If None, the current time in UTC is used.
latency (float | None): The latency in seconds between the prompt and the generation. Defaults to 1.
"""
generation_timestamp = timestamp or datetime.now(timezone.utc)
prompt_timestamp = generation_timestamp - timedelta(seconds=latency or 1)

self.track(
event="Prompt",
user_id=user_id,
properties={"$text": prompt, **(properties or {})},
timestamp=prompt_timestamp,
)

self.track(
event="Generation",
user_id=user_id,
properties={
"$text": generation,
"$assistant_id": assistant_id,
"$prompt": prompt,
"latency(s)": latency,
**(properties or {}),
},
timestamp=generation_timestamp,
)
llm_event_dict = {
"user_id": user_id,
"prompt": prompt,
"generation": generation,
"assistant_id": assistant_id,
"properties": properties,
"timestamp": (
timestamp.isoformat()
if timestamp
else datetime.now(timezone.utc).isoformat()
),
"latency": latency,
"event_type": EventTypes.llm_event,
}

with self._lock:
self.queue.append(llm_event_dict)
self.logger.debug(f"LLM event by user `{user_id}` has been added to queue.")

def flush(self):
"""Flush the event queue."""
Expand All @@ -167,14 +166,7 @@ def flush(self):
if events:
for batch_id in range(0, len(events), self.flush_batch_size):
batch = events[batch_id : batch_id + self.flush_batch_size]
success = self._post(batch)

if not success:
self.logger.warning(
f"Retrying flush of batch {batch_id} of {len(batch)} events."
)
time.sleep(5)
self._post(batch)
self._process_batch(batch, batch_id)

self.last_flush_time = datetime.now(timezone.utc)
self.logger.debug(f"Flush of {len(events)} events completed.")
Expand All @@ -189,19 +181,63 @@ def close(self):
self.flush()
self.logger.debug("Background thread stopped and final flush completed.")

def _post(self, events: list[dict]):
def _process_batch(self, batch: list[dict], batch_id: int):
events = []
llm_events = []
for event in batch:
if event["event_type"] == EventTypes.llm_event:
event.pop("event_type")
llm_events.append(event)
elif event["event_type"] == EventTypes.event:
event.pop("event_type")
events.append(event)

events_success = self._post(events, IngestionEndpoints.events, EventTypes.event)
llm_events_success = self._post(
llm_events, IngestionEndpoints.llm_events, EventTypes.llm_event
)

if not events_success:
self._retry_post(
events, IngestionEndpoints.events, EventTypes.event, batch_id
)

if not llm_events_success:
self._retry_post(
llm_events,
IngestionEndpoints.llm_events,
EventTypes.llm_event,
batch_id,
)

def _retry_post(
self,
events: list[dict],
endpoint: IngestionEndpoints,
event_type: EventTypes,
batch_id: int,
):
self.logger.warning(
f"Retrying flush of {len(events)} {event_type.value}s in batch {batch_id}"
)
time.sleep(5)
self._post(events, endpoint, event_type)

def _post(
self, events: list[dict], endpoint: IngestionEndpoints, event_type: EventTypes
):
with requests.Session() as session:
try:
response = session.post(
f"{self.host}/publish_events",
f"{self.host}/{endpoint.value}",
headers={
"Content-Type": "application/json",
"x-api-key": self.api_key,
},
json=events,
)
response.raise_for_status()
self.logger.info(f"{len(events)} events sent to Trubrics.")
self.logger.info(f"{len(events)} {event_type.value}s sent to Trubrics.")
return True
except requests.exceptions.HTTPError as e:
error_message = response.text if response.status_code != 200 else str(e)
Expand All @@ -212,12 +248,12 @@ def _post(self, events: list[dict]):
except json.JSONDecodeError:
pass
self.logger.error(
f"Error flushing {len(events)} events: {error_message}"
f"Error flushing {len(events)} {event_type.value}s: {error_message}"
)
return False
except Exception as e:
self.logger.error(
f"Unexpected error flushing {len(events)} events: {e}"
f"Unexpected error flushing {len(events)} {event_type.value}s: {e}"
)
return False

Expand Down