From 397abeb3ac5029029f327812f7a454529e005449 Mon Sep 17 00:00:00 2001 From: guillaumefontan Date: Mon, 24 Feb 2025 17:17:56 +0100 Subject: [PATCH 1/7] llm attributes endpoint --- examples/sdk_test.py | 9 ++++- trubrics/enums.py | 11 ++++++ trubrics/main.py | 92 +++++++++++++++++++++++++++----------------- 3 files changed, 75 insertions(+), 37 deletions(-) create mode 100644 trubrics/enums.py diff --git a/examples/sdk_test.py b/examples/sdk_test.py index 0fcf5c7..cfa6dff 100644 --- a/examples/sdk_test.py +++ b/examples/sdk_test.py @@ -1,9 +1,14 @@ +import logging import os from trubrics import Trubrics -trubrics = Trubrics(api_key=os.environ["TRUBRICS_API_KEY"]) -trubrics.track(event="Sign up", user_id="user_id") +trubrics = Trubrics(api_key=os.environ["TRUBRICS_API_KEY"], host="http://127.0.0.1:8001/api/ingestion") +trubrics.logger.setLevel(logging.DEBUG) + + +trubrics.track(event="Page view", user_id="user_id", properties={"page": "home"}) +trubrics.track(event="Page view", user_id="user_id", properties={"page": "events"}) trubrics.track_llm( user_id="user_id", diff --git a/trubrics/enums.py b/trubrics/enums.py new file mode 100644 index 0000000..65b12b0 --- /dev/null +++ b/trubrics/enums.py @@ -0,0 +1,11 @@ +from enum import Enum + + +class EventTypes(Enum): + event = "event" + llm_event = "llm_event" + + +class IngestionEndpoints(Enum): + events = "publish_events" + llm_events = "publish_llm_events" \ No newline at end of file diff --git a/trubrics/main.py b/trubrics/main.py index 4897495..f280499 100644 --- a/trubrics/main.py +++ b/trubrics/main.py @@ -2,7 +2,7 @@ import logging import threading import time -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone import requests @@ -13,6 +13,7 @@ MAX_FLUSH_BATCH_SIZE, MIN_FLUSH_INTERVAL, ) +from trubrics.enums import IngestionEndpoints, EventTypes from trubrics.logger import trubrics_logger @@ -101,6 +102,7 @@ def track( if timestamp else datetime.now(timezone.utc).isoformat() ), + "event_type": EventTypes.event, } with self._lock: self.queue.append(event_dict) @@ -129,28 +131,27 @@ def track_llm( timestamp (datetime | None): The timestamp of the generation event. If None, the current time in UTC is used. latency (float | None): The latency in seconds between the prompt and the generation. Defaults to 1. """ - generation_timestamp = timestamp or datetime.now(timezone.utc) - prompt_timestamp = generation_timestamp - timedelta(seconds=latency or 1) - - self.track( - event="Prompt", - user_id=user_id, - properties={"$text": prompt, **(properties or {})}, - timestamp=prompt_timestamp, - ) - self.track( - event="Generation", - user_id=user_id, - properties={ - "$text": generation, - "$assistant_id": assistant_id, - "$prompt": prompt, - "latency(s)": latency, - **(properties or {}), - }, - timestamp=generation_timestamp, - ) + llm_event_dict = { + "user_id": user_id, + "prompt": prompt, + "generation": generation, + "assistant_id": assistant_id, + "properties": properties, + "timestamp": ( + timestamp.isoformat() + if timestamp + else datetime.now(timezone.utc).isoformat() + ), + "latency": latency, + "event_type": EventTypes.llm_event, + } + + with self._lock: + self.queue.append(llm_event_dict) + self.logger.debug( + f"LLM event by user `{user_id}` has been added to queue." + ) def flush(self): """Flush the event queue.""" @@ -167,14 +168,7 @@ def flush(self): if events: for batch_id in range(0, len(events), self.flush_batch_size): batch = events[batch_id : batch_id + self.flush_batch_size] - success = self._post(batch) - - if not success: - self.logger.warning( - f"Retrying flush of batch {batch_id} of {len(batch)} events." - ) - time.sleep(5) - self._post(batch) + self._process_batch(batch, batch_id) self.last_flush_time = datetime.now(timezone.utc) self.logger.debug(f"Flush of {len(events)} events completed.") @@ -189,11 +183,39 @@ def close(self): self.flush() self.logger.debug("Background thread stopped and final flush completed.") - def _post(self, events: list[dict]): + def _process_batch(self, batch: list[dict], batch_id: int): + events = [] + llm_events = [] + for event in batch: + if event["event_type"] == EventTypes.llm_event: + event.pop("event_type") + llm_events.append(event) + elif event["event_type"] == EventTypes.event: + event.pop("event_type") + events.append(event) + + events_success = self._post(events, IngestionEndpoints.events.value, EventTypes.event) + llm_events_success = self._post(llm_events, IngestionEndpoints.llm_events.value, EventTypes.llm_event) + + if not events_success: + self.logger.warning( + f"Retrying flush of batch {batch_id} of {len(events)} events." + ) + time.sleep(5) + self._post(events, IngestionEndpoints.events.value, EventTypes.event) + + if not llm_events_success: + self.logger.warning( + f"Retrying flush of batch {batch_id} of {len(llm_events)} llm_events." + ) + time.sleep(5) + self._post(llm_events, IngestionEndpoints.llm_events.value, EventTypes.llm_event) + + def _post(self, events: list[dict], endpoint: str, event_type: EventTypes): with requests.Session() as session: try: response = session.post( - f"{self.host}/publish_events", + f"{self.host}/{endpoint}", headers={ "Content-Type": "application/json", "x-api-key": self.api_key, @@ -201,7 +223,7 @@ def _post(self, events: list[dict]): json=events, ) response.raise_for_status() - self.logger.info(f"{len(events)} events sent to Trubrics.") + self.logger.info(f"{len(events)} {event_type.value} sent to Trubrics.") return True except requests.exceptions.HTTPError as e: error_message = response.text if response.status_code != 200 else str(e) @@ -212,12 +234,12 @@ def _post(self, events: list[dict]): except json.JSONDecodeError: pass self.logger.error( - f"Error flushing {len(events)} events: {error_message}" + f"Error flushing {len(events)} {event_type.value}: {error_message}" ) return False except Exception as e: self.logger.error( - f"Unexpected error flushing {len(events)} events: {e}" + f"Unexpected error flushing {len(events)} {event_type.value}: {e}" ) return False From 38f62dafc1a4c7f722e3866e13f424c500a690cf Mon Sep 17 00:00:00 2001 From: guillaumefontan Date: Mon, 24 Feb 2025 17:19:53 +0100 Subject: [PATCH 2/7] changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 186db9f..16f9173 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Change logging level to debug for flushing events and adding event to queue +### Added +- LLM events are now routed to the new publis_llm_events endpoint + ## [1.8.2] - 2025-02-10 ### Fixed - Added missing flushing limit values, and setup automated flush if limit is reached From f30df96e632660f7eee5cc4e995ebf0b9d83e5ab Mon Sep 17 00:00:00 2001 From: guillaumefontan Date: Mon, 24 Feb 2025 17:24:24 +0100 Subject: [PATCH 3/7] code check --- trubrics/enums.py | 3 ++- trubrics/main.py | 22 ++++++++++++++-------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/trubrics/enums.py b/trubrics/enums.py index 65b12b0..929b621 100644 --- a/trubrics/enums.py +++ b/trubrics/enums.py @@ -8,4 +8,5 @@ class EventTypes(Enum): class IngestionEndpoints(Enum): events = "publish_events" - llm_events = "publish_llm_events" \ No newline at end of file + llm_events = "publish_llm_events" + \ No newline at end of file diff --git a/trubrics/main.py b/trubrics/main.py index f280499..84bb231 100644 --- a/trubrics/main.py +++ b/trubrics/main.py @@ -13,7 +13,7 @@ MAX_FLUSH_BATCH_SIZE, MIN_FLUSH_INTERVAL, ) -from trubrics.enums import IngestionEndpoints, EventTypes +from trubrics.enums import EventTypes, IngestionEndpoints from trubrics.logger import trubrics_logger @@ -149,9 +149,7 @@ def track_llm( with self._lock: self.queue.append(llm_event_dict) - self.logger.debug( - f"LLM event by user `{user_id}` has been added to queue." - ) + self.logger.debug(f"LLM event by user `{user_id}` has been added to queue.") def flush(self): """Flush the event queue.""" @@ -194,22 +192,30 @@ def _process_batch(self, batch: list[dict], batch_id: int): event.pop("event_type") events.append(event) - events_success = self._post(events, IngestionEndpoints.events.value, EventTypes.event) - llm_events_success = self._post(llm_events, IngestionEndpoints.llm_events.value, EventTypes.llm_event) + events_success = self._post( + events, IngestionEndpoints.events.value, EventTypes.event + ) + llm_events_success = self._post( + llm_events, IngestionEndpoints.llm_events.value, EventTypes.llm_event + ) if not events_success: self.logger.warning( f"Retrying flush of batch {batch_id} of {len(events)} events." ) time.sleep(5) - self._post(events, IngestionEndpoints.events.value, EventTypes.event) + self._post( + events, IngestionEndpoints.events.value, EventTypes.event + ) if not llm_events_success: self.logger.warning( f"Retrying flush of batch {batch_id} of {len(llm_events)} llm_events." ) time.sleep(5) - self._post(llm_events, IngestionEndpoints.llm_events.value, EventTypes.llm_event) + self._post( + llm_events, IngestionEndpoints.llm_events.value, EventTypes.llm_event + ) def _post(self, events: list[dict], endpoint: str, event_type: EventTypes): with requests.Session() as session: From e8a2f762a5167ca70035a0e2f095f65d640a6804 Mon Sep 17 00:00:00 2001 From: guillaumefontan Date: Mon, 24 Feb 2025 17:31:20 +0100 Subject: [PATCH 4/7] pre commit --- examples/sdk_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/sdk_test.py b/examples/sdk_test.py index cfa6dff..b879136 100644 --- a/examples/sdk_test.py +++ b/examples/sdk_test.py @@ -3,7 +3,7 @@ from trubrics import Trubrics -trubrics = Trubrics(api_key=os.environ["TRUBRICS_API_KEY"], host="http://127.0.0.1:8001/api/ingestion") +trubrics = Trubrics(api_key=os.environ["TRUBRICS_API_KEY"]) trubrics.logger.setLevel(logging.DEBUG) @@ -11,7 +11,7 @@ trubrics.track(event="Page view", user_id="user_id", properties={"page": "events"}) trubrics.track_llm( - user_id="user_id", + user_id="sdk_test", prompt="What is Trubrics?", assistant_id="gpt4o", generation="Trubrics is a product analytics platform for AI applications.", From 23ef0919ce756aa83c8ce0cdf80d97b4a39b0540 Mon Sep 17 00:00:00 2001 From: guillaumefontan Date: Mon, 24 Feb 2025 17:33:18 +0100 Subject: [PATCH 5/7] pre commit --- .pre-commit-config.yaml | 6 +++--- trubrics/enums.py | 1 - trubrics/main.py | 6 ++---- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 64bd810..ba5dc9e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.5.0 + rev: v5.0.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer @@ -8,7 +8,7 @@ repos: - id: check-added-large-files - id: check-json - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.1.6 + rev: v0.9.6 hooks: - id: ruff-format - id: ruff @@ -17,7 +17,7 @@ repos: - id: ruff name: ruff-linter - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.8.0 + rev: v1.15.0 hooks: - id: mypy additional_dependencies: ['types-requests'] diff --git a/trubrics/enums.py b/trubrics/enums.py index 929b621..97c7138 100644 --- a/trubrics/enums.py +++ b/trubrics/enums.py @@ -9,4 +9,3 @@ class EventTypes(Enum): class IngestionEndpoints(Enum): events = "publish_events" llm_events = "publish_llm_events" - \ No newline at end of file diff --git a/trubrics/main.py b/trubrics/main.py index 84bb231..a8c14bc 100644 --- a/trubrics/main.py +++ b/trubrics/main.py @@ -13,7 +13,7 @@ MAX_FLUSH_BATCH_SIZE, MIN_FLUSH_INTERVAL, ) -from trubrics.enums import EventTypes, IngestionEndpoints +from trubrics.enums import EventTypes, IngestionEndpoints from trubrics.logger import trubrics_logger @@ -204,9 +204,7 @@ def _process_batch(self, batch: list[dict], batch_id: int): f"Retrying flush of batch {batch_id} of {len(events)} events." ) time.sleep(5) - self._post( - events, IngestionEndpoints.events.value, EventTypes.event - ) + self._post(events, IngestionEndpoints.events.value, EventTypes.event) if not llm_events_success: self.logger.warning( From 82c3497c30dd7a89ac7e9e353eb335a19e17e8d7 Mon Sep 17 00:00:00 2001 From: guillaumefontan Date: Mon, 24 Feb 2025 18:12:44 +0100 Subject: [PATCH 6/7] logging --- trubrics/main.py | 46 ++++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/trubrics/main.py b/trubrics/main.py index a8c14bc..ef2a366 100644 --- a/trubrics/main.py +++ b/trubrics/main.py @@ -192,30 +192,40 @@ def _process_batch(self, batch: list[dict], batch_id: int): event.pop("event_type") events.append(event) - events_success = self._post( - events, IngestionEndpoints.events.value, EventTypes.event - ) + events_success = self._post(events, IngestionEndpoints.events, EventTypes.event) llm_events_success = self._post( - llm_events, IngestionEndpoints.llm_events.value, EventTypes.llm_event + llm_events, IngestionEndpoints.llm_events, EventTypes.llm_event ) if not events_success: - self.logger.warning( - f"Retrying flush of batch {batch_id} of {len(events)} events." + self._retry_post( + events, IngestionEndpoints.events, EventTypes.event, batch_id ) - time.sleep(5) - self._post(events, IngestionEndpoints.events.value, EventTypes.event) if not llm_events_success: - self.logger.warning( - f"Retrying flush of batch {batch_id} of {len(llm_events)} llm_events." - ) - time.sleep(5) - self._post( - llm_events, IngestionEndpoints.llm_events.value, EventTypes.llm_event + self._retry_post( + llm_events, + IngestionEndpoints.llm_events, + EventTypes.llm_event, + batch_id, ) - def _post(self, events: list[dict], endpoint: str, event_type: EventTypes): + def _retry_post( + self, + events: list[dict], + endpoint: IngestionEndpoints, + event_type: EventTypes, + batch_id: int, + ): + self.logger.warning( + f"Retrying flush of {len(events)} {event_type.value}s in batch {batch_id}" + ) + time.sleep(5) + self._post(events, endpoint, event_type) + + def _post( + self, events: list[dict], endpoint: IngestionEndpoints, event_type: EventTypes + ): with requests.Session() as session: try: response = session.post( @@ -227,7 +237,7 @@ def _post(self, events: list[dict], endpoint: str, event_type: EventTypes): json=events, ) response.raise_for_status() - self.logger.info(f"{len(events)} {event_type.value} sent to Trubrics.") + self.logger.info(f"{len(events)} {event_type.value}s sent to Trubrics.") return True except requests.exceptions.HTTPError as e: error_message = response.text if response.status_code != 200 else str(e) @@ -238,12 +248,12 @@ def _post(self, events: list[dict], endpoint: str, event_type: EventTypes): except json.JSONDecodeError: pass self.logger.error( - f"Error flushing {len(events)} {event_type.value}: {error_message}" + f"Error flushing {len(events)} {event_type.value}s: {error_message}" ) return False except Exception as e: self.logger.error( - f"Unexpected error flushing {len(events)} {event_type.value}: {e}" + f"Unexpected error flushing {len(events)} {event_type.value}s: {e}" ) return False From 20f762820533cf86ceb6d288273f5d92d1fa1b74 Mon Sep 17 00:00:00 2001 From: guillaumefontan Date: Mon, 24 Feb 2025 18:13:48 +0100 Subject: [PATCH 7/7] fix endpoint --- trubrics/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trubrics/main.py b/trubrics/main.py index ef2a366..e6ec2c2 100644 --- a/trubrics/main.py +++ b/trubrics/main.py @@ -229,7 +229,7 @@ def _post( with requests.Session() as session: try: response = session.post( - f"{self.host}/{endpoint}", + f"{self.host}/{endpoint.value}", headers={ "Content-Type": "application/json", "x-api-key": self.api_key,