diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 64bd810..ba5dc9e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.5.0 + rev: v5.0.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer @@ -8,7 +8,7 @@ repos: - id: check-added-large-files - id: check-json - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.1.6 + rev: v0.9.6 hooks: - id: ruff-format - id: ruff @@ -17,7 +17,7 @@ repos: - id: ruff name: ruff-linter - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.8.0 + rev: v1.15.0 hooks: - id: mypy additional_dependencies: ['types-requests'] diff --git a/CHANGELOG.md b/CHANGELOG.md index 186db9f..16f9173 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Change logging level to debug for flushing events and adding event to queue +### Added +- LLM events are now routed to the new publis_llm_events endpoint + ## [1.8.2] - 2025-02-10 ### Fixed - Added missing flushing limit values, and setup automated flush if limit is reached diff --git a/examples/sdk_test.py b/examples/sdk_test.py index 0fcf5c7..b879136 100644 --- a/examples/sdk_test.py +++ b/examples/sdk_test.py @@ -1,12 +1,17 @@ +import logging import os from trubrics import Trubrics trubrics = Trubrics(api_key=os.environ["TRUBRICS_API_KEY"]) -trubrics.track(event="Sign up", user_id="user_id") +trubrics.logger.setLevel(logging.DEBUG) + + +trubrics.track(event="Page view", user_id="user_id", properties={"page": "home"}) +trubrics.track(event="Page view", user_id="user_id", properties={"page": "events"}) trubrics.track_llm( - user_id="user_id", + user_id="sdk_test", prompt="What is Trubrics?", assistant_id="gpt4o", generation="Trubrics is a product analytics platform for AI applications.", diff --git a/trubrics/enums.py b/trubrics/enums.py new file mode 100644 index 0000000..97c7138 --- /dev/null +++ b/trubrics/enums.py @@ -0,0 +1,11 @@ +from enum import Enum + + +class EventTypes(Enum): + event = "event" + llm_event = "llm_event" + + +class IngestionEndpoints(Enum): + events = "publish_events" + llm_events = "publish_llm_events" diff --git a/trubrics/main.py b/trubrics/main.py index 4897495..e6ec2c2 100644 --- a/trubrics/main.py +++ b/trubrics/main.py @@ -2,7 +2,7 @@ import logging import threading import time -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone import requests @@ -13,6 +13,7 @@ MAX_FLUSH_BATCH_SIZE, MIN_FLUSH_INTERVAL, ) +from trubrics.enums import EventTypes, IngestionEndpoints from trubrics.logger import trubrics_logger @@ -101,6 +102,7 @@ def track( if timestamp else datetime.now(timezone.utc).isoformat() ), + "event_type": EventTypes.event, } with self._lock: self.queue.append(event_dict) @@ -129,28 +131,25 @@ def track_llm( timestamp (datetime | None): The timestamp of the generation event. If None, the current time in UTC is used. latency (float | None): The latency in seconds between the prompt and the generation. Defaults to 1. """ - generation_timestamp = timestamp or datetime.now(timezone.utc) - prompt_timestamp = generation_timestamp - timedelta(seconds=latency or 1) - - self.track( - event="Prompt", - user_id=user_id, - properties={"$text": prompt, **(properties or {})}, - timestamp=prompt_timestamp, - ) - self.track( - event="Generation", - user_id=user_id, - properties={ - "$text": generation, - "$assistant_id": assistant_id, - "$prompt": prompt, - "latency(s)": latency, - **(properties or {}), - }, - timestamp=generation_timestamp, - ) + llm_event_dict = { + "user_id": user_id, + "prompt": prompt, + "generation": generation, + "assistant_id": assistant_id, + "properties": properties, + "timestamp": ( + timestamp.isoformat() + if timestamp + else datetime.now(timezone.utc).isoformat() + ), + "latency": latency, + "event_type": EventTypes.llm_event, + } + + with self._lock: + self.queue.append(llm_event_dict) + self.logger.debug(f"LLM event by user `{user_id}` has been added to queue.") def flush(self): """Flush the event queue.""" @@ -167,14 +166,7 @@ def flush(self): if events: for batch_id in range(0, len(events), self.flush_batch_size): batch = events[batch_id : batch_id + self.flush_batch_size] - success = self._post(batch) - - if not success: - self.logger.warning( - f"Retrying flush of batch {batch_id} of {len(batch)} events." - ) - time.sleep(5) - self._post(batch) + self._process_batch(batch, batch_id) self.last_flush_time = datetime.now(timezone.utc) self.logger.debug(f"Flush of {len(events)} events completed.") @@ -189,11 +181,55 @@ def close(self): self.flush() self.logger.debug("Background thread stopped and final flush completed.") - def _post(self, events: list[dict]): + def _process_batch(self, batch: list[dict], batch_id: int): + events = [] + llm_events = [] + for event in batch: + if event["event_type"] == EventTypes.llm_event: + event.pop("event_type") + llm_events.append(event) + elif event["event_type"] == EventTypes.event: + event.pop("event_type") + events.append(event) + + events_success = self._post(events, IngestionEndpoints.events, EventTypes.event) + llm_events_success = self._post( + llm_events, IngestionEndpoints.llm_events, EventTypes.llm_event + ) + + if not events_success: + self._retry_post( + events, IngestionEndpoints.events, EventTypes.event, batch_id + ) + + if not llm_events_success: + self._retry_post( + llm_events, + IngestionEndpoints.llm_events, + EventTypes.llm_event, + batch_id, + ) + + def _retry_post( + self, + events: list[dict], + endpoint: IngestionEndpoints, + event_type: EventTypes, + batch_id: int, + ): + self.logger.warning( + f"Retrying flush of {len(events)} {event_type.value}s in batch {batch_id}" + ) + time.sleep(5) + self._post(events, endpoint, event_type) + + def _post( + self, events: list[dict], endpoint: IngestionEndpoints, event_type: EventTypes + ): with requests.Session() as session: try: response = session.post( - f"{self.host}/publish_events", + f"{self.host}/{endpoint.value}", headers={ "Content-Type": "application/json", "x-api-key": self.api_key, @@ -201,7 +237,7 @@ def _post(self, events: list[dict]): json=events, ) response.raise_for_status() - self.logger.info(f"{len(events)} events sent to Trubrics.") + self.logger.info(f"{len(events)} {event_type.value}s sent to Trubrics.") return True except requests.exceptions.HTTPError as e: error_message = response.text if response.status_code != 200 else str(e) @@ -212,12 +248,12 @@ def _post(self, events: list[dict]): except json.JSONDecodeError: pass self.logger.error( - f"Error flushing {len(events)} events: {error_message}" + f"Error flushing {len(events)} {event_type.value}s: {error_message}" ) return False except Exception as e: self.logger.error( - f"Unexpected error flushing {len(events)} events: {e}" + f"Unexpected error flushing {len(events)} {event_type.value}s: {e}" ) return False