Skip to content

Allow step to be None #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 45 additions & 16 deletions src/neptune_scale/api/attribute.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import functools
import itertools
import threading
import warnings
from datetime import datetime
from datetime import (
datetime,
timezone,
)
from typing import (
Any,
Callable,
Expand All @@ -16,6 +20,7 @@
cast,
)

from neptune_scale.exceptions import NeptuneSeriesStepNonIncreasing
from neptune_scale.sync.metadata_splitter import MetadataSplitter
from neptune_scale.sync.operations_queue import OperationsQueue

Expand Down Expand Up @@ -61,6 +66,11 @@ def __init__(self, project: str, run_id: str, operations_queue: OperationsQueue)
self._run_id = run_id
self._operations_queue = operations_queue
self._attributes: Dict[str, Attribute] = {}
# Keep a list of path -> (last step, last value) mappings to detect non-increasing steps
# at call site. The backend will detect this error as well, but it's more convenient for the user
# to get the error as soon as possible.
self._metric_state: Dict[str, Tuple[float, float]] = {}
self._lock = threading.RLock()

def __getitem__(self, path: str) -> "Attribute":
path = cleanup_path(path)
Expand All @@ -87,22 +97,41 @@ def log(
) -> None:
if timestamp is None:
timestamp = datetime.now()
elif isinstance(timestamp, float):
timestamp = datetime.fromtimestamp(timestamp)
elif isinstance(timestamp, (float, int)):
timestamp = datetime.fromtimestamp(timestamp, timezone.utc)

with self._lock:
self._verify_and_update_metrics_state(step, metrics)

# TODO: Move splitting into the worker process. Here we should just send messages as they are.
splitter: MetadataSplitter = MetadataSplitter(
project=self._project,
run_id=self._run_id,
step=step,
timestamp=timestamp,
configs=configs,
metrics=metrics,
add_tags=tags_add,
remove_tags=tags_remove,
)

for operation, metadata_size in splitter:
self._operations_queue.enqueue(operation=operation, size=metadata_size, step=step)

splitter: MetadataSplitter = MetadataSplitter(
project=self._project,
run_id=self._run_id,
step=step,
timestamp=timestamp,
configs=configs,
metrics=metrics,
add_tags=tags_add,
remove_tags=tags_remove,
)
def _verify_and_update_metrics_state(self, step: Optional[float], metrics: Optional[Dict[str, float]]) -> None:
"""Check if step in provided metrics is increasing, raise `NeptuneSeriesStepNonIncreasing` if not."""

for operation, metadata_size in splitter:
self._operations_queue.enqueue(operation=operation, size=metadata_size, key=step)
if step is None or metrics is None:
return

for metric, value in metrics.items():
if (state := self._metric_state.get(metric)) is not None:
last_step, last_value = state
# Repeating a step is fine as long as the value does not change
if step < last_step or (step == last_step and value != last_value):
raise NeptuneSeriesStepNonIncreasing()

self._metric_state[metric] = (step, value)


class Attribute:
Expand Down Expand Up @@ -130,7 +159,7 @@ def append(
self,
value: Union[Dict[str, Any], float],
*,
step: Union[float, int],
step: Optional[Union[float, int]] = None,
timestamp: Optional[Union[float, datetime]] = None,
wait: bool = False,
**kwargs: Any,
Expand Down
2 changes: 1 addition & 1 deletion src/neptune_scale/api/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ def __setitem__(self, key: str, value: Any) -> None:
def log_metrics(
self,
data: Dict[str, Union[float, int]],
step: Union[float, int],
step: Optional[Union[float, int]] = None,
*,
timestamp: Optional[datetime] = None,
) -> None:
Expand Down
25 changes: 21 additions & 4 deletions src/neptune_scale/sync/aggregating_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def get(self) -> BatchedOperations:
start = time.monotonic()

batch_operations: list[RunOperation] = []
last_batch_key: Optional[float] = None
current_batch_step: Optional[float] = None
batch_sequence_id: Optional[int] = None
batch_timestamp: Optional[float] = None

Expand All @@ -97,7 +97,7 @@ def get(self) -> BatchedOperations:
new_operation = RunOperation()
new_operation.ParseFromString(element.operation)
batch_operations.append(new_operation)
last_batch_key = element.batch_key
current_batch_step = element.step
batch_bytes += len(element.operation)
else:
if not element.is_batchable:
Expand All @@ -112,9 +112,26 @@ def get(self) -> BatchedOperations:

new_operation = RunOperation()
new_operation.ParseFromString(element.operation)
if element.batch_key != last_batch_key:

# This is where we decide if we need to wrap up the current UpdateSnapshot and start a new one.
# This happens if the step changes, but also if it is None.
# On None, the backend will assign the next available step. This is why we cannot merge here,
# especially considering metrics, since we would overwrite them:
#
# log metric1=1.0, step=None
# log metric1=1.2, step=None
#
# After merging by step, we would end up with a single value (the most recent one).
#
# TODO: we could potentially keep merging until we encounter a metric already seen in this batch.
# Something to optimize in the future. Given the metrics:
# m1, m2, m3, m4, m1, m2, m3, ...
# we could batch up to m4 and close the batch when encountering m1, as long as steps are None
# We could also keep batching if there are no metrics in a given operation, although this would
# not be a common case.
if element.step is None or element.step != current_batch_step:
batch_operations.append(new_operation)
last_batch_key = element.batch_key
current_batch_step = element.step
else:
merge_run_operation(batch_operations[-1], new_operation)
batch_bytes += element.metadata_size
Expand Down
4 changes: 2 additions & 2 deletions src/neptune_scale/sync/operations_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def last_timestamp(self) -> Optional[float]:
with self._lock:
return self._last_timestamp

def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, key: Optional[float] = None) -> None:
def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, step: Optional[float] = None) -> None:
try:
is_metadata_update = operation.HasField("update")
serialized_operation = operation.SerializeToString()
Expand All @@ -75,7 +75,7 @@ def enqueue(self, *, operation: RunOperation, size: Optional[int] = None, key: O
operation=serialized_operation,
metadata_size=size,
is_batchable=is_metadata_update,
batch_key=key,
step=step,
),
block=True,
timeout=None,
Expand Down
4 changes: 2 additions & 2 deletions src/neptune_scale/sync/queue_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ class SingleOperation(NamedTuple):
is_batchable: bool
# Size of the metadata in the operation (without project, family, run_id etc.)
metadata_size: Optional[int]
# Update metadata key
batch_key: Optional[float]
# Step provided by the user
step: Optional[float]
2 changes: 2 additions & 0 deletions src/neptune_scale/sync/sync_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ def submit(self, *, operation: RunOperation) -> Optional[SubmitResponse]:

def work(self) -> None:
try:
# TODO: is there a point in serializing the data on AggregatingQueue? It does not move between processes,
# so we could just pass around instances of RunOperation
while (operation := self.get_next()) is not None:
sequence_id, timestamp, data = operation

Expand Down
63 changes: 63 additions & 0 deletions tests/e2e/test_log_and_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,66 @@ def test_single_non_finite_metric(value, sync_run, ro_run):
path = unique_path("test_series/non_finite")
sync_run.log_metrics(data={path: value}, step=1)
assert path not in refresh(ro_run).field_names


@mark.parametrize("first_step", [None, 0, 10])
def test_auto_step_with_initial_step(run, ro_run, first_step):
"""Logging series values with step=None results in backend-side step assignment"""

path = unique_path(f"test_series/auto_step_{first_step}")

_, values = random_series()

run.log_metrics(data={path: values[0]}, step=first_step)
for value in values[1:]:
run.log_metrics(data={path: value})

run.wait_for_processing()

# Backend will assign steps starting from zero by default,
# so handle this test case properly
if first_step is None:
first_step = 0

df = ro_run[path].fetch_values()
assert df["step"].tolist() == [float(x) for x in list(range(first_step, first_step + len(values)))]
assert df["value"].tolist() == values


def test_auto_step_with_manual_increase(run, ro_run):
"""Increase step manually at a single point in series, then use auto-step"""

path = unique_path("test_series/auto_step_increase")
run.log_metrics(data={path: 1})
run.log_metrics(data={path: 2}, step=10)
run.log_metrics(data={path: 3})

run.wait_for_processing()

df = ro_run[path].fetch_values()
assert df["step"].tolist() == [0, 10, 11]
assert df["value"].tolist() == [1, 2, 3]


def test_auto_step_with_different_metrics(run, ro_run):
path1 = unique_path("test_series/auto_step_different_metrics1")
path2 = unique_path("test_series/auto_step_different_metrics2")

run.log_metrics(data={path1: 1})
run.log_metrics(data={path2: 1}, step=10)

run.log_metrics(data={path1: 2})
run.log_metrics(data={path2: 2}, step=20)

run.log_metrics(data={path1: 3}, step=5)
run.log_metrics(data={path2: 3})

run.wait_for_processing()

df1 = ro_run[path1].fetch_values()
assert df1["step"].tolist() == [0.0, 1.0, 5.0]
assert df1["value"].tolist() == [1, 2, 3]

df2 = ro_run[path2].fetch_values()
assert df2["step"].tolist() == [10.0, 20.0, 21.0]
assert df2["value"].tolist() == [1, 2, 3]
Loading
Loading