Skip to content

fix: limit number of steps in batch #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/neptune_scale/core/components/aggregating_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from neptune_scale.parameters import (
BATCH_WAIT_TIME_SECONDS,
MAX_BATCH_SIZE,
MAX_BATCH_SNAPSHOT_SIZE,
MAX_QUEUE_ELEMENT_SIZE,
)

Expand All @@ -33,10 +34,12 @@ def __init__(
max_queue_size: int,
max_elements_in_batch: int = MAX_BATCH_SIZE,
max_queue_element_size: int = MAX_QUEUE_ELEMENT_SIZE,
max_snapshots_in_batch: int = MAX_BATCH_SNAPSHOT_SIZE,
wait_time: float = BATCH_WAIT_TIME_SECONDS,
) -> None:
self._max_queue_size = max_queue_size
self._max_elements_in_batch = max_elements_in_batch
self._max_snapshots_in_batch = max_snapshots_in_batch
self._max_queue_element_size = max_queue_element_size
self._wait_time = wait_time

Expand Down Expand Up @@ -110,6 +113,10 @@ def get(self) -> BatchedOperations:
logger.debug("Batch closed due to size limit %s", batch_bytes + element.metadata_size)
break

if batch_operations and len(batch_operations) >= self._max_snapshots_in_batch:
logger.debug("Batch closed due to limit of snapshots in batch %s", len(batch_operations))
break

new_operation = RunOperation()
new_operation.ParseFromString(element.operation)
if element.batch_key != last_batch_key:
Expand Down
2 changes: 1 addition & 1 deletion src/neptune_scale/core/components/queue_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class BatchedOperations(NamedTuple):
sequence_id: int
# Timestamp of the last operation in the batch
timestamp: float
# Protobuf serialized (RunOperationBatch)
# Protobuf serialized (RunOperation)
operation: bytes


Expand Down
1 change: 1 addition & 0 deletions src/neptune_scale/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Operations queue
MAX_BATCH_SIZE = 100000
MAX_QUEUE_SIZE = 1000000
MAX_BATCH_SNAPSHOT_SIZE = 2000
MAX_MULTIPROCESSING_QUEUE_SIZE = 32767
MAX_QUEUE_ELEMENT_SIZE = 1024 * 1024 # 1MB
# Wait up to this many seconds for incoming operations before submitting a batch
Expand Down
39 changes: 39 additions & 0 deletions tests/unit/test_aggregating_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,45 @@ def test__queue_element_size_limit_with_different_steps():
assert queue.get() == BatchedOperations(sequence_id=2, timestamp=element2.timestamp, operation=element2.operation)


def test__batch_snapshot_limit():
# given
update1 = UpdateRunSnapshot(step=Step(whole=1), assign={f"aa{i}": Value(int64=(i * 97)) for i in range(2)})
update2 = UpdateRunSnapshot(step=Step(whole=2), assign={f"bb{i}": Value(int64=(i * 25)) for i in range(2)})

# and
operation1 = RunOperation(update=update1, project="project", run_id="run_id")
operation2 = RunOperation(update=update2, project="project", run_id="run_id")

# and
element1 = SingleOperation(
sequence_id=1,
timestamp=time.process_time(),
operation=operation1.SerializeToString(),
is_batchable=True,
metadata_size=update1.ByteSize(),
batch_key=1.0,
)
element2 = SingleOperation(
sequence_id=2,
timestamp=time.process_time(),
operation=operation2.SerializeToString(),
is_batchable=True,
metadata_size=update2.ByteSize(),
batch_key=2.0,
)

# and
queue = AggregatingQueue(max_queue_size=2, max_elements_in_batch=2, max_snapshots_in_batch=1)

# when
queue.put_nowait(element=element1)
queue.put_nowait(element=element2)

# then
assert queue.get() == BatchedOperations(sequence_id=1, timestamp=element1.timestamp, operation=element1.operation)
assert queue.get() == BatchedOperations(sequence_id=2, timestamp=element2.timestamp, operation=element2.operation)


@freeze_time("2024-09-01")
def test__not_merge_two_run_creation():
# given
Expand Down
Loading