From 3dd91d05e7c3607085d83d1dbbd117e5100ee64c Mon Sep 17 00:00:00 2001 From: Willy Douhard Date: Mon, 23 Sep 2024 10:36:13 +0200 Subject: [PATCH 1/4] feat: serialize chat messages --- literalai/callback/langchain_callback.py | 58 +++++++++++++----------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/literalai/callback/langchain_callback.py b/literalai/callback/langchain_callback.py index 0620d47..29c9b95 100644 --- a/literalai/callback/langchain_callback.py +++ b/literalai/callback/langchain_callback.py @@ -1,16 +1,6 @@ import time from importlib.metadata import version -from typing import ( - TYPE_CHECKING, - Any, - Dict, - List, - Optional, - Tuple, - TypedDict, - Union, - cast, -) +from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypedDict, Union, cast from literalai.helper import ensure_values_serializable from literalai.observability.generation import ( @@ -28,17 +18,6 @@ from literalai.observability.step import TrueStepType -def process_content(content: Any) -> Tuple[Dict, Optional[str]]: - if content is None: - return {}, None - if isinstance(content, dict): - return content, "json" - elif isinstance(content, str): - return {"content": content}, "text" - else: - return {"content": str(content)}, "text" - - def process_variable_value(value: Any) -> str: return str(value) if value is not None else "" @@ -151,6 +130,35 @@ def _convert_message( return msg + def _is_message(self, to_check: Any) -> bool: + return isinstance(to_check, BaseMessage) + + def _is_message_array(self, to_check: Any) -> bool: + return isinstance(to_check, list) and all( + self._is_message(item) for item in to_check + ) + + def process_content(self, content: Any, root=True): + if content is None: + return {} + if self._is_message_array(content): + if root: + return {"messages": [self._convert_message(m) for m in content]} + else: + return [self._convert_message(m) for m in content] + elif self._is_message(content): + return self._convert_message(content) + elif isinstance(content, dict): + processed_dict = {} + for key, value in content.items(): + processed_value = self.process_content(value, root=False) + processed_dict[key] = processed_value + return processed_dict + elif isinstance(content, str): + return {"content": content} + else: + return {"content": str(content)} + def _build_llm_settings( self, serialized: Dict, @@ -366,7 +374,6 @@ def _start_trace(self, run: Run) -> None: self.generation_inputs[str(run.id)] = ensure_values_serializable( run.inputs ) - if ignore: return @@ -393,7 +400,7 @@ def _start_trace(self, run: Run) -> None: ) step.tags = run.tags step.metadata = run.metadata - step.input, _ = process_content(run.inputs) + step.input = self.process_content(run.inputs) self.steps[str(run.id)] = step @@ -406,7 +413,6 @@ def _on_run_update(self, run: Run) -> None: return current_step = self.steps.get(str(run.id), None) - if run.run_type == "llm" and current_step: provider, model, tools, llm_settings = self._build_llm_settings( (run.serialized or {}), (run.extra or {}).get("invocation_params") @@ -508,7 +514,7 @@ def _on_run_update(self, run: Run) -> None: output = outputs.get(output_keys[0], outputs) if current_step: - current_step.output, _ = process_content(output) + current_step.output = self.process_content(output) current_step.end() def _on_error(self, error: BaseException, *, run_id: "UUID", **kwargs: Any): From 8f01ae9fa6638c23232ed5df36748d095a4c3554 Mon Sep 17 00:00:00 2001 From: Willy Douhard Date: Mon, 23 Sep 2024 10:58:49 +0200 Subject: [PATCH 2/4] chore: ci --- literalai/event_processor.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/literalai/event_processor.py b/literalai/event_processor.py index c605773..e7405fb 100644 --- a/literalai/event_processor.py +++ b/literalai/event_processor.py @@ -35,18 +35,17 @@ def __init__(self, api: "LiteralAPI", batch_size: int = 1, disabled: bool = Fals self.batch_size = batch_size self.api = api self.event_queue = queue.Queue() - self.processing_thread = threading.Thread( - target=self._process_events, daemon=True - ) self.disabled = disabled self.processing_counter = 0 self.counter_lock = threading.Lock() + self.stop_event = threading.Event() self.last_batch_time = time.time() + self.processing_thread = threading.Thread( + target=self._process_events, daemon=True + ) if not self.disabled: self.processing_thread.start() - self.stop_event = threading.Event() - def add_event(self, event: "StepDict"): with self.counter_lock: self.processing_counter += 1 From 516ac90234b3cdd457599932df0733b011f5f7ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugues=20de=20Saxc=C3=A9?= Date: Mon, 23 Sep 2024 11:32:39 +0200 Subject: [PATCH 3/4] fix: move back stop event --- literalai/event_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/literalai/event_processor.py b/literalai/event_processor.py index e7405fb..8cf1fce 100644 --- a/literalai/event_processor.py +++ b/literalai/event_processor.py @@ -38,13 +38,13 @@ def __init__(self, api: "LiteralAPI", batch_size: int = 1, disabled: bool = Fals self.disabled = disabled self.processing_counter = 0 self.counter_lock = threading.Lock() - self.stop_event = threading.Event() self.last_batch_time = time.time() self.processing_thread = threading.Thread( target=self._process_events, daemon=True ) if not self.disabled: self.processing_thread.start() + self.stop_event = threading.Event() def add_event(self, event: "StepDict"): with self.counter_lock: From 2e3967a0750d5d451847e0ed97298c515e9a7876 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugues=20de=20Saxc=C3=A9?= Date: Mon, 23 Sep 2024 12:46:28 +0200 Subject: [PATCH 4/4] fix: force httpx 0.30.0 for python 3.9 --- requirements-dev.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index 5230191..a63498a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,11 +1,11 @@ pytest pytest-asyncio pytest-timeout +pytest_httpx==0.30.0 pre-commit python-dotenv ruff mypy langchain llama-index -pytest_httpx mistralai