Skip to content

fix: limit number of steps in batch #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/neptune_scale/core/components/aggregating_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from neptune_scale.parameters import (
BATCH_WAIT_TIME_SECONDS,
MAX_BATCH_SIZE,
MAX_BATCH_SNAPSHOT_SIZE,
MAX_QUEUE_ELEMENT_SIZE,
)

Expand All @@ -33,10 +34,12 @@ def __init__(
max_queue_size: int,
max_elements_in_batch: int = MAX_BATCH_SIZE,
max_queue_element_size: int = MAX_QUEUE_ELEMENT_SIZE,
max_snapshots_in_batch: int = MAX_BATCH_SNAPSHOT_SIZE,
wait_time: float = BATCH_WAIT_TIME_SECONDS,
) -> None:
self._max_queue_size = max_queue_size
self._max_elements_in_batch = max_elements_in_batch
self._max_snapshots_in_batch = max_snapshots_in_batch
self._max_queue_element_size = max_queue_element_size
self._wait_time = wait_time

Expand Down Expand Up @@ -94,6 +97,10 @@ def get(self) -> BatchedOperations:
break

if not batch_operations or element.batch_key != last_batch_key:
if batch_operations and len(batch_operations) >= self._max_snapshots_in_batch:
logger.debug("Batch closed due to limit of snapshots in batch %s", len(batch_operations))
break

new_operation = RunOperation()
new_operation.ParseFromString(element.operation)
batch_operations.append(new_operation)
Expand Down Expand Up @@ -125,7 +132,7 @@ def get(self) -> BatchedOperations:
self.commit()

if not element.is_batchable:
logger.debug("Batch closed due to first not being batchable")
logger.debug("Batch closed due to the last element not being batchable")
break

t1 = time.monotonic()
Expand Down
1 change: 1 addition & 0 deletions src/neptune_scale/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Operations queue
MAX_BATCH_SIZE = 100000
MAX_QUEUE_SIZE = 1000000
MAX_BATCH_SNAPSHOT_SIZE = 2000
MAX_MULTIPROCESSING_QUEUE_SIZE = 32767
MAX_QUEUE_ELEMENT_SIZE = 1024 * 1024 # 1MB
# Wait up to this many seconds for incoming operations before submitting a batch
Expand Down
40 changes: 40 additions & 0 deletions tests/unit/test_aggregating_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,46 @@ def test__batching():
assert all(k in batch.update.assign for k in ["aa0", "aa1", "bb0", "bb1"])


@freeze_time("2024-09-01")
def test__batch_snapshot_limit():
# given
update1 = UpdateRunSnapshot(step=Step(whole=1), assign={f"aa{i}": Value(int64=(i * 97)) for i in range(2)})
update2 = UpdateRunSnapshot(step=Step(whole=2), assign={f"bb{i}": Value(int64=(i * 25)) for i in range(2)})

# and
operation1 = RunOperation(update=update1, project="project", run_id="run_id")
operation2 = RunOperation(update=update2, project="project", run_id="run_id")

# and
element1 = SingleOperation(
sequence_id=1,
timestamp=time.process_time(),
operation=operation1.SerializeToString(),
is_batchable=True,
metadata_size=update1.ByteSize(),
batch_key=1.0,
)
element2 = SingleOperation(
sequence_id=2,
timestamp=time.process_time(),
operation=operation2.SerializeToString(),
is_batchable=True,
metadata_size=update2.ByteSize(),
batch_key=2.0,
)

# and
queue = AggregatingQueue(max_queue_size=2, max_elements_in_batch=2, max_snapshots_in_batch=1)

# when
queue.put_nowait(element=element1)
queue.put_nowait(element=element2)

# then
assert queue.get() == BatchedOperations(sequence_id=1, timestamp=element1.timestamp, operation=element1.operation)
assert queue.get() == BatchedOperations(sequence_id=2, timestamp=element2.timestamp, operation=element2.operation)


@freeze_time("2024-09-01")
def test__not_merge_two_run_creation():
# given
Expand Down
Loading