From 65128b965611d161c6cb3d6ba0210b8cbb96981d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20So=C5=9Bnicki?= Date: Tue, 19 Nov 2024 13:43:02 +0100 Subject: [PATCH 1/2] fix: limit number of steps in batch --- .../core/components/aggregating_queue.py | 7 ++++ src/neptune_scale/parameters.py | 1 + tests/unit/test_aggregating_queue.py | 39 +++++++++++++++++++ 3 files changed, 47 insertions(+) diff --git a/src/neptune_scale/core/components/aggregating_queue.py b/src/neptune_scale/core/components/aggregating_queue.py index dd0f05ac..b251086f 100644 --- a/src/neptune_scale/core/components/aggregating_queue.py +++ b/src/neptune_scale/core/components/aggregating_queue.py @@ -21,6 +21,7 @@ from neptune_scale.parameters import ( BATCH_WAIT_TIME_SECONDS, MAX_BATCH_SIZE, + MAX_BATCH_SNAPSHOT_SIZE, MAX_QUEUE_ELEMENT_SIZE, ) @@ -33,10 +34,12 @@ def __init__( max_queue_size: int, max_elements_in_batch: int = MAX_BATCH_SIZE, max_queue_element_size: int = MAX_QUEUE_ELEMENT_SIZE, + max_snapshots_in_batch: int = MAX_BATCH_SNAPSHOT_SIZE, wait_time: float = BATCH_WAIT_TIME_SECONDS, ) -> None: self._max_queue_size = max_queue_size self._max_elements_in_batch = max_elements_in_batch + self._max_snapshots_in_batch = max_snapshots_in_batch self._max_queue_element_size = max_queue_element_size self._wait_time = wait_time @@ -110,6 +113,10 @@ def get(self) -> BatchedOperations: logger.debug("Batch closed due to size limit %s", batch_bytes + element.metadata_size) break + if batch_operations and len(batch_operations) >= self._max_snapshots_in_batch: + logger.debug("Batch closed due to limit of snapshots in batch %s", len(batch_operations)) + break + new_operation = RunOperation() new_operation.ParseFromString(element.operation) if element.batch_key != last_batch_key: diff --git a/src/neptune_scale/parameters.py b/src/neptune_scale/parameters.py index 3718721f..5c31516a 100644 --- a/src/neptune_scale/parameters.py +++ b/src/neptune_scale/parameters.py @@ -5,6 +5,7 @@ # Operations queue MAX_BATCH_SIZE = 100000 MAX_QUEUE_SIZE = 1000000 +MAX_BATCH_SNAPSHOT_SIZE = 2000 MAX_MULTIPROCESSING_QUEUE_SIZE = 32767 MAX_QUEUE_ELEMENT_SIZE = 1024 * 1024 # 1MB # Wait up to this many seconds for incoming operations before submitting a batch diff --git a/tests/unit/test_aggregating_queue.py b/tests/unit/test_aggregating_queue.py index 723c3302..3cf3b300 100644 --- a/tests/unit/test_aggregating_queue.py +++ b/tests/unit/test_aggregating_queue.py @@ -218,6 +218,45 @@ def test__queue_element_size_limit_with_different_steps(): assert queue.get() == BatchedOperations(sequence_id=2, timestamp=element2.timestamp, operation=element2.operation) +def test__batch_snapshot_limit(): + # given + update1 = UpdateRunSnapshot(step=Step(whole=1), assign={f"aa{i}": Value(int64=(i * 97)) for i in range(2)}) + update2 = UpdateRunSnapshot(step=Step(whole=2), assign={f"bb{i}": Value(int64=(i * 25)) for i in range(2)}) + + # and + operation1 = RunOperation(update=update1, project="project", run_id="run_id") + operation2 = RunOperation(update=update2, project="project", run_id="run_id") + + # and + element1 = SingleOperation( + sequence_id=1, + timestamp=time.process_time(), + operation=operation1.SerializeToString(), + is_batchable=True, + metadata_size=update1.ByteSize(), + batch_key=1.0, + ) + element2 = SingleOperation( + sequence_id=2, + timestamp=time.process_time(), + operation=operation2.SerializeToString(), + is_batchable=True, + metadata_size=update2.ByteSize(), + batch_key=2.0, + ) + + # and + queue = AggregatingQueue(max_queue_size=2, max_elements_in_batch=2, max_snapshots_in_batch=1) + + # when + queue.put_nowait(element=element1) + queue.put_nowait(element=element2) + + # then + assert queue.get() == BatchedOperations(sequence_id=1, timestamp=element1.timestamp, operation=element1.operation) + assert queue.get() == BatchedOperations(sequence_id=2, timestamp=element2.timestamp, operation=element2.operation) + + @freeze_time("2024-09-01") def test__not_merge_two_run_creation(): # given From 3bdc7cefa4eb42bac084613ca88261b254aca94e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20So=C5=9Bnicki?= Date: Wed, 20 Nov 2024 16:32:45 +0100 Subject: [PATCH 2/2] fix: class name in comment --- src/neptune_scale/core/components/queue_element.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/neptune_scale/core/components/queue_element.py b/src/neptune_scale/core/components/queue_element.py index 1c37ff11..0d8c6193 100644 --- a/src/neptune_scale/core/components/queue_element.py +++ b/src/neptune_scale/core/components/queue_element.py @@ -11,7 +11,7 @@ class BatchedOperations(NamedTuple): sequence_id: int # Timestamp of the last operation in the batch timestamp: float - # Protobuf serialized (RunOperationBatch) + # Protobuf serialized (RunOperation) operation: bytes