Skip to content

fix: fix flush with batch size > 1 #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion literalai/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def create_scores(self, scores: List[ScoreDict]):
variables[f"{k}_{id}"] = v

def process_response(response):
return [Score.from_dict(x) for x in response["data"].values()]
return [x for x in response["data"].values()]

return self.gql_helper(query, "create scores", variables, process_response)

Expand Down
6 changes: 3 additions & 3 deletions literalai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class BaseLiteralClient:

def __init__(
self,
batch_size: int = 1,
batch_size: int = 5,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't noticed we'd dropped the batch size :/

is_async: bool = False,
api_key: Optional[str] = None,
url: Optional[str] = None,
Expand Down Expand Up @@ -274,7 +274,7 @@ class LiteralClient(BaseLiteralClient):

def __init__(
self,
batch_size: int = 1,
batch_size: int = 5,
api_key: Optional[str] = None,
url: Optional[str] = None,
environment: Optional[Environment] = None,
Expand All @@ -298,7 +298,7 @@ class AsyncLiteralClient(BaseLiteralClient):

def __init__(
self,
batch_size: int = 1,
batch_size: int = 5,
api_key: Optional[str] = None,
url: Optional[str] = None,
environment: Optional[Environment] = None,
Expand Down
18 changes: 15 additions & 3 deletions literalai/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,21 @@ def __init__(self, api: "LiteralAPI", batch_size: int = 1, disabled: bool = Fals
target=self._process_events, daemon=True
)
self.disabled = disabled
self.processing_counter = 0
self.counter_lock = threading.Lock()
if not self.disabled:
self.processing_thread.start()

self.stop_event = threading.Event()

def add_event(self, event: "StepDict"):
with self.counter_lock:
self.processing_counter += 1
self.event_queue.put(event)

async def a_add_events(self, event: "StepDict"):
with self.counter_lock:
self.processing_counter += 1
await to_thread(self.event_queue.put, event)

def _process_events(self):
Expand All @@ -62,7 +68,7 @@ def _process_events(self):
# No more events at the moment, proceed with processing what's in the batch
pass

# Process the batch if any events are present - in a separate thread
# Process the batch if any events are present
if batch:
self._process_batch(batch)

Expand All @@ -83,19 +89,25 @@ def _process_batch(self, batch: List):
while not self._try_process_batch(batch) and retries < 1:
retries += 1
time.sleep(DEFAULT_SLEEP_TIME)
with self.counter_lock:
self.processing_counter -= len(batch)

def flush_and_stop(self):
self.stop_event.set()
if not self.disabled:
self.processing_thread.join()

async def aflush(self):
while not self.event_queue.empty():
while not self.event_queue.empty() or self._is_processing():
await asyncio.sleep(DEFAULT_SLEEP_TIME)

def flush(self):
while not self.event_queue.empty():
while not self.event_queue.empty() or self._is_processing():
time.sleep(DEFAULT_SLEEP_TIME)

def _is_processing(self):
with self.counter_lock:
return self.processing_counter > 0

def __del__(self):
self.flush_and_stop()
8 changes: 4 additions & 4 deletions tests/e2e/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def broken_client(self):
api_key = os.getenv("LITERAL_API_KEY", None)
assert url is not None and api_key is not None, "Missing environment variables"

client = LiteralClient(batch_size=1, url=url, api_key=api_key)
client = LiteralClient(batch_size=5, url=url, api_key=api_key)
yield client
client.event_processor.flush_and_stop()

Expand All @@ -42,7 +42,7 @@ def client(self):
api_key = os.getenv("LITERAL_API_KEY", None)
assert url is not None and api_key is not None, "Missing environment variables"

client = LiteralClient(batch_size=1, url=url, api_key=api_key)
client = LiteralClient(batch_size=5, url=url, api_key=api_key)
yield client
client.event_processor.flush_and_stop()

Expand All @@ -53,7 +53,7 @@ def staging_client(self):
assert url is not None and api_key is not None, "Missing environment variables"

client = LiteralClient(
batch_size=1, url=url, api_key=api_key, environment="staging"
batch_size=5, url=url, api_key=api_key, environment="staging"
)
yield client
client.event_processor.flush_and_stop()
Expand All @@ -64,7 +64,7 @@ def async_client(self):
api_key = os.getenv("LITERAL_API_KEY", None)
assert url is not None and api_key is not None, "Missing environment variables"

async_client = AsyncLiteralClient(batch_size=1, url=url, api_key=api_key)
async_client = AsyncLiteralClient(batch_size=5, url=url, api_key=api_key)
yield async_client
async_client.event_processor.flush_and_stop()

Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/test_mistralai.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def client(self):
api_key = os.getenv("LITERAL_API_KEY", None)
assert url is not None and api_key is not None, "Missing environment variables"

client = LiteralClient(batch_size=1, url=url, api_key=api_key)
client = LiteralClient(batch_size=5, url=url, api_key=api_key)
client.instrument_mistralai()

return client
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/test_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def client(self):
api_key = os.getenv("LITERAL_API_KEY", None)
assert url is not None and api_key is not None, "Missing environment variables"

client = LiteralClient(batch_size=1, url=url, api_key=api_key)
client = LiteralClient(batch_size=5, url=url, api_key=api_key)
client.instrument_openai()

return client
Expand Down
Loading